saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] // Temporarily unused during migration
31use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use std::collections::{HashMap, HashSet};
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40use tokio::sync::{RwLock, broadcast};
41use tokio::time::Instant;
42use tracing::{debug, error, info, trace, warn};
43
44/// Configuration for a P2P node
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeConfig {
47    /// Local peer ID for this node
48    pub peer_id: Option<PeerId>,
49
50    /// Addresses to listen on for incoming connections
51    pub listen_addrs: Vec<std::net::SocketAddr>,
52
53    /// Primary listen address (for compatibility)
54    pub listen_addr: std::net::SocketAddr,
55
56    /// Bootstrap peers to connect to on startup (legacy)
57    pub bootstrap_peers: Vec<std::net::SocketAddr>,
58
59    /// Bootstrap peers as strings (for integration tests)
60    pub bootstrap_peers_str: Vec<String>,
61
62    /// Enable IPv6 support
63    pub enable_ipv6: bool,
64
65    // MCP removed; will be redesigned later
66    /// Connection timeout duration
67    pub connection_timeout: Duration,
68
69    /// Keep-alive interval for connections
70    pub keep_alive_interval: Duration,
71
72    /// Maximum number of concurrent connections
73    pub max_connections: usize,
74
75    /// Maximum number of incoming connections
76    pub max_incoming_connections: usize,
77
78    /// DHT configuration
79    pub dht_config: DHTConfig,
80
81    /// Security configuration
82    pub security_config: SecurityConfig,
83
84    /// Production hardening configuration
85    pub production_config: Option<ProductionConfig>,
86
87    /// Bootstrap cache configuration
88    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
89
90    /// Optional IP diversity configuration for Sybil protection tuning.
91    ///
92    /// When set, this configuration is used by bootstrap peer discovery and
93    /// other diversity-enforcing subsystems. If `None`, defaults are used.
94    pub diversity_config: Option<crate::security::IPDiversityConfig>,
95
96    /// Attestation configuration for software integrity verification.
97    ///
98    /// Controls how nodes verify each other's software attestation during handshake.
99    /// In Phase 1, this is used for "soft enforcement" (logging only).
100    #[serde(default)]
101    pub attestation_config: crate::attestation::AttestationConfig,
102}
103
104/// DHT-specific configuration
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct DHTConfig {
107    /// Kademlia K parameter (bucket size)
108    pub k_value: usize,
109
110    /// Kademlia alpha parameter (parallelism)
111    pub alpha_value: usize,
112
113    /// DHT record TTL
114    pub record_ttl: Duration,
115
116    /// DHT refresh interval
117    pub refresh_interval: Duration,
118}
119
120/// Security configuration
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct SecurityConfig {
123    /// Enable noise protocol for encryption
124    pub enable_noise: bool,
125
126    /// Enable TLS for secure transport
127    pub enable_tls: bool,
128
129    /// Trust level for peer verification
130    pub trust_level: TrustLevel,
131}
132
133/// Trust level for peer verification
134#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
135pub enum TrustLevel {
136    /// No verification required
137    None,
138    /// Basic peer ID verification
139    Basic,
140    /// Full cryptographic verification
141    Full,
142}
143
144impl NodeConfig {
145    /// Create a new NodeConfig with default values
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if default addresses cannot be parsed
150    pub fn new() -> Result<Self> {
151        // Load config and use its defaults
152        let config = Config::default();
153
154        // Parse the default listen address
155        let listen_addr = config.listen_socket_addr()?;
156
157        // Create listen addresses based on config
158        let mut listen_addrs = vec![];
159
160        // Add IPv6 address if enabled
161        if config.network.ipv6_enabled {
162            let ipv6_addr = std::net::SocketAddr::new(
163                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
164                listen_addr.port(),
165            );
166            listen_addrs.push(ipv6_addr);
167        }
168
169        // Always add IPv4
170        let ipv4_addr = std::net::SocketAddr::new(
171            std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
172            listen_addr.port(),
173        );
174        listen_addrs.push(ipv4_addr);
175
176        Ok(Self {
177            peer_id: None,
178            listen_addrs,
179            listen_addr,
180            bootstrap_peers: Vec::new(),
181            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
182            enable_ipv6: config.network.ipv6_enabled,
183
184            connection_timeout: Duration::from_secs(config.network.connection_timeout),
185            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
186            max_connections: config.network.max_connections,
187            max_incoming_connections: config.security.connection_limit as usize,
188            dht_config: DHTConfig::default(),
189            security_config: SecurityConfig::default(),
190            production_config: None,
191            bootstrap_cache_config: None,
192            diversity_config: None,
193            attestation_config: config.attestation.clone(),
194        })
195    }
196}
197
198impl Default for NodeConfig {
199    fn default() -> Self {
200        // Use config defaults for network settings
201        let config = Config::default();
202
203        // Parse the default listen address - use safe fallback if parsing fails
204        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
205            std::net::SocketAddr::new(
206                std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
207                9000,
208            )
209        });
210
211        Self {
212            peer_id: None,
213            listen_addrs: vec![
214                std::net::SocketAddr::new(
215                    std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
216                    listen_addr.port(),
217                ),
218                std::net::SocketAddr::new(
219                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
220                    listen_addr.port(),
221                ),
222            ],
223            listen_addr,
224            bootstrap_peers: Vec::new(),
225            bootstrap_peers_str: Vec::new(),
226            enable_ipv6: config.network.ipv6_enabled,
227
228            connection_timeout: Duration::from_secs(config.network.connection_timeout),
229            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
230            max_connections: config.network.max_connections,
231            max_incoming_connections: config.security.connection_limit as usize,
232            dht_config: DHTConfig::default(),
233            security_config: SecurityConfig::default(),
234            production_config: None, // Use default production config if enabled
235            bootstrap_cache_config: None,
236            diversity_config: None,
237            attestation_config: config.attestation.clone(),
238        }
239    }
240}
241
242impl NodeConfig {
243    /// Create NodeConfig from Config
244    pub fn from_config(config: &Config) -> Result<Self> {
245        let listen_addr = config.listen_socket_addr()?;
246        let bootstrap_addrs = config.bootstrap_addrs()?;
247
248        let mut node_config = Self {
249            peer_id: None,
250            listen_addrs: vec![listen_addr],
251            listen_addr,
252            bootstrap_peers: bootstrap_addrs
253                .iter()
254                .map(|addr| addr.socket_addr())
255                .collect(),
256            bootstrap_peers_str: config
257                .network
258                .bootstrap_nodes
259                .iter()
260                .map(|addr| addr.to_string())
261                .collect(),
262            enable_ipv6: config.network.ipv6_enabled,
263
264            connection_timeout: Duration::from_secs(config.network.connection_timeout),
265            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
266            max_connections: config.network.max_connections,
267            max_incoming_connections: config.security.connection_limit as usize,
268            dht_config: DHTConfig {
269                k_value: 20,
270                alpha_value: 3,
271                record_ttl: Duration::from_secs(3600),
272                refresh_interval: Duration::from_secs(900),
273            },
274            security_config: SecurityConfig {
275                enable_noise: true,
276                enable_tls: true,
277                trust_level: TrustLevel::Basic,
278            },
279            production_config: Some(ProductionConfig {
280                max_connections: config.network.max_connections,
281                max_memory_bytes: 0,  // unlimited
282                max_bandwidth_bps: 0, // unlimited
283                connection_timeout: Duration::from_secs(config.network.connection_timeout),
284                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
285                health_check_interval: Duration::from_secs(30),
286                metrics_interval: Duration::from_secs(60),
287                enable_performance_tracking: true,
288                enable_auto_cleanup: true,
289                shutdown_timeout: Duration::from_secs(30),
290                rate_limits: crate::production::RateLimitConfig::default(),
291            }),
292            bootstrap_cache_config: None,
293            diversity_config: None,
294            attestation_config: config.attestation.clone(),
295        };
296
297        // Add IPv6 listen address if enabled
298        if config.network.ipv6_enabled {
299            node_config.listen_addrs.push(std::net::SocketAddr::new(
300                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
301                listen_addr.port(),
302            ));
303        }
304
305        Ok(node_config)
306    }
307
308    /// Try to build a NodeConfig from a listen address string
309    pub fn with_listen_addr(addr: &str) -> Result<Self> {
310        let listen_addr: std::net::SocketAddr = addr
311            .parse()
312            .map_err(|e: std::net::AddrParseError| {
313                NetworkError::InvalidAddress(e.to_string().into())
314            })
315            .map_err(P2PError::Network)?;
316        let cfg = NodeConfig {
317            listen_addr,
318            listen_addrs: vec![listen_addr],
319            diversity_config: None,
320            ..Default::default()
321        };
322        Ok(cfg)
323    }
324}
325
326impl Default for DHTConfig {
327    fn default() -> Self {
328        Self {
329            k_value: 20,
330            alpha_value: 5,
331            record_ttl: Duration::from_secs(3600), // 1 hour
332            refresh_interval: Duration::from_secs(600), // 10 minutes
333        }
334    }
335}
336
337impl Default for SecurityConfig {
338    fn default() -> Self {
339        Self {
340            enable_noise: true,
341            enable_tls: true,
342            trust_level: TrustLevel::Basic,
343        }
344    }
345}
346
347/// Information about a connected peer
348#[derive(Debug, Clone)]
349pub struct PeerInfo {
350    /// Peer identifier
351    pub peer_id: PeerId,
352
353    /// Peer's addresses
354    pub addresses: Vec<String>,
355
356    /// Connection timestamp
357    pub connected_at: Instant,
358
359    /// Last seen timestamp
360    pub last_seen: Instant,
361
362    /// Connection status
363    pub status: ConnectionStatus,
364
365    /// Supported protocols
366    pub protocols: Vec<String>,
367
368    /// Number of heartbeats received
369    pub heartbeat_count: u64,
370}
371
372/// Connection status for a peer
373#[derive(Debug, Clone, PartialEq)]
374pub enum ConnectionStatus {
375    /// Connection is being established
376    Connecting,
377    /// Connection is established and active
378    Connected,
379    /// Connection is being closed
380    Disconnecting,
381    /// Connection is closed
382    Disconnected,
383    /// Connection failed
384    Failed(String),
385}
386
387/// Network events that can occur
388#[derive(Debug, Clone)]
389pub enum NetworkEvent {
390    /// A new peer has connected
391    PeerConnected {
392        /// The identifier of the newly connected peer
393        peer_id: PeerId,
394        /// The network addresses where the peer can be reached
395        addresses: Vec<String>,
396    },
397
398    /// A peer has disconnected
399    PeerDisconnected {
400        /// The identifier of the disconnected peer
401        peer_id: PeerId,
402        /// The reason for the disconnection
403        reason: String,
404    },
405
406    /// A message was received from a peer
407    MessageReceived {
408        /// The identifier of the sending peer
409        peer_id: PeerId,
410        /// The protocol used for the message
411        protocol: String,
412        /// The raw message data
413        data: Vec<u8>,
414    },
415
416    /// A connection attempt failed
417    ConnectionFailed {
418        /// The identifier of the peer (if known)
419        peer_id: Option<PeerId>,
420        /// The address where connection was attempted
421        address: String,
422        /// The error message describing the failure
423        error: String,
424    },
425
426    /// DHT record was stored
427    DHTRecordStored {
428        /// The DHT key where the record was stored
429        key: Vec<u8>,
430        /// The value that was stored
431        value: Vec<u8>,
432    },
433
434    /// DHT record was retrieved
435    DHTRecordRetrieved {
436        /// The DHT key that was queried
437        key: Vec<u8>,
438        /// The retrieved value, if found
439        value: Option<Vec<u8>>,
440    },
441}
442
443/// Network events that can occur in the P2P system
444///
445/// Events are broadcast to all listeners and provide real-time
446/// notifications of network state changes and message arrivals.
447#[derive(Debug, Clone)]
448pub enum P2PEvent {
449    /// Message received from a peer on a specific topic
450    Message {
451        /// Topic or channel the message was sent on
452        topic: String,
453        /// Peer ID of the message sender
454        source: PeerId,
455        /// Raw message data payload
456        data: Vec<u8>,
457    },
458    /// A new peer has connected to the network
459    PeerConnected(PeerId),
460    /// A peer has disconnected from the network
461    PeerDisconnected(PeerId),
462}
463
464/// Main P2P node structure
465/// Main P2P network node that manages connections, routing, and communication
466///
467/// This struct represents a complete P2P network participant that can:
468/// - Connect to other peers via QUIC transport
469/// - Participate in distributed hash table (DHT) operations
470/// - Send and receive messages through various protocols
471/// - Handle network events and peer lifecycle
472/// - Provide MCP (Model Context Protocol) services
473pub struct P2PNode {
474    /// Node configuration
475    config: NodeConfig,
476
477    /// Our peer ID
478    peer_id: PeerId,
479
480    /// Connected peers
481    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
482
483    /// Network event broadcaster
484    event_tx: broadcast::Sender<P2PEvent>,
485
486    /// Listen addresses
487    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
488
489    /// Node start time
490    start_time: Instant,
491
492    /// Running state
493    running: RwLock<bool>,
494
495    /// DHT instance (optional)
496    dht: Option<Arc<RwLock<DHT>>>,
497
498    /// Production resource manager (optional)
499    resource_manager: Option<Arc<ResourceManager>>,
500
501    /// Bootstrap cache manager for peer discovery
502    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
503
504    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
505    dual_node: Arc<DualStackNetworkNode>,
506
507    /// Rate limiter for connection and request throttling
508    #[allow(dead_code)]
509    rate_limiter: Arc<RateLimiter>,
510
511    /// Active connections (tracked by peer_id)
512    /// This set is synchronized with ant-quic's connection state via event monitoring
513    active_connections: Arc<RwLock<HashSet<PeerId>>>,
514
515    /// Security dashboard for monitoring
516    pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
517
518    /// Connection lifecycle monitor task handle
519    #[allow(dead_code)]
520    connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
521
522    /// Keepalive task handle
523    #[allow(dead_code)]
524    keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
525
526    /// Shutdown flag for background tasks
527    #[allow(dead_code)]
528    shutdown: Arc<AtomicBool>,
529
530    /// GeoIP provider for connection validation
531    #[allow(dead_code)]
532    geo_provider: Arc<BgpGeoProvider>,
533
534    /// This node's entangled identity (derived from public key + binary hash + nonce).
535    /// Used for software attestation verification during peer handshake.
536    entangled_id: Option<crate::attestation::EntangledId>,
537
538    /// BLAKE3 hash of the running binary for attestation.
539    /// In production, this is computed at startup from the executable.
540    binary_hash: [u8; 32],
541}
542
543/// Normalize wildcard bind addresses to localhost loopback addresses
544///
545/// ant-quic correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
546/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
547///
548/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
549/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
550/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
551/// - All other addresses pass through unchanged
552///
553/// # Arguments
554/// * `addr` - The SocketAddr to normalize
555///
556/// # Returns
557/// * Normalized SocketAddr suitable for remote connections
558fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
559    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
560
561    if addr.ip().is_unspecified() {
562        // Convert unspecified addresses to loopback
563        let loopback_ip = match addr {
564            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
565            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
566        };
567        std::net::SocketAddr::new(loopback_ip, addr.port())
568    } else {
569        // Not a wildcard address, pass through unchanged
570        addr
571    }
572}
573
574impl P2PNode {
575    /// Minimal constructor for tests that avoids real networking
576    pub fn new_for_tests() -> Result<Self> {
577        let (event_tx, _) = broadcast::channel(16);
578        Ok(Self {
579            config: NodeConfig::default(),
580            peer_id: "test_peer".to_string(),
581            peers: Arc::new(RwLock::new(HashMap::new())),
582            event_tx,
583            listen_addrs: RwLock::new(Vec::new()),
584            start_time: Instant::now(),
585            running: RwLock::new(false),
586            dht: None,
587            resource_manager: None,
588            bootstrap_manager: None,
589            dual_node: {
590                // Bind dual-stack nodes on ephemeral ports for tests
591                let v6: Option<std::net::SocketAddr> = "[::1]:0"
592                    .parse()
593                    .ok()
594                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
595                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
596                let handle = tokio::runtime::Handle::current();
597                let dual_attempt = handle.block_on(
598                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
599                );
600                let dual = match dual_attempt {
601                    Ok(d) => d,
602                    Err(_e1) => {
603                        // Fallback to IPv4-only ephemeral bind
604                        let fallback = handle.block_on(
605                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
606                                None,
607                                "127.0.0.1:0".parse().ok(),
608                            ),
609                        );
610                        match fallback {
611                            Ok(d) => d,
612                            Err(e2) => {
613                                return Err(P2PError::Network(NetworkError::BindError(
614                                    format!("Failed to create dual-stack network node: {}", e2)
615                                        .into(),
616                                )));
617                            }
618                        }
619                    }
620                };
621                Arc::new(dual)
622            },
623            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
624                max_requests: 100,
625                burst_size: 100,
626                window: std::time::Duration::from_secs(1),
627                ..Default::default()
628            })),
629            active_connections: Arc::new(RwLock::new(HashSet::new())),
630            connection_monitor_handle: Arc::new(RwLock::new(None)),
631            keepalive_handle: Arc::new(RwLock::new(None)),
632            shutdown: Arc::new(AtomicBool::new(false)),
633            geo_provider: Arc::new(BgpGeoProvider::new()),
634            security_dashboard: None,
635            // Attestation fields - use dummy values for tests
636            entangled_id: None,
637            binary_hash: [0u8; 32],
638        })
639    }
640    /// Create a new P2P node with the given configuration
641    pub async fn new(config: NodeConfig) -> Result<Self> {
642        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
643            // Generate a random peer ID for now
644            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
645        });
646
647        let (event_tx, _) = broadcast::channel(1000);
648
649        // Initialize and register a TrustWeightedKademlia DHT for the global API
650        // Use a deterministic local NodeId derived from the peer_id
651        {
652            use blake3::Hasher;
653            let mut hasher = Hasher::new();
654            hasher.update(peer_id.as_bytes());
655            let digest = hasher.finalize();
656            let mut nid = [0u8; 32];
657            nid.copy_from_slice(digest.as_bytes());
658            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
659                crate::identity::node_identity::NodeId::from_bytes(nid),
660            ));
661            // TODO: Update to use new clean API
662            // let _ = crate::api::set_dht_instance(twdht);
663        }
664
665        // Initialize DHT if needed
666        let (dht, security_dashboard) = if true {
667            // Assuming DHT is always enabled for now, or check config
668            let _dht_config = crate::dht::DHTConfig {
669                replication_factor: config.dht_config.k_value,
670                bucket_size: config.dht_config.k_value,
671                alpha: config.dht_config.alpha_value,
672                record_ttl: config.dht_config.record_ttl,
673                bucket_refresh_interval: config.dht_config.refresh_interval,
674                republish_interval: config.dht_config.refresh_interval,
675                max_distance: 160,
676            };
677            // Convert peer_id String to NodeId
678            let peer_bytes = peer_id.as_bytes();
679            let mut node_id_bytes = [0u8; 32];
680            let len = peer_bytes.len().min(32);
681            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
682            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
683            let dht_instance = DHT::new(node_id).map_err(|e| {
684                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
685                    e.to_string().into(),
686                ))
687            })?;
688            dht_instance.start_maintenance_tasks();
689
690            // Create Security Dashboard
691            let security_metrics = dht_instance.security_metrics();
692            let dashboard = crate::dht::metrics::SecurityDashboard::new(
693                security_metrics,
694                Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
695                Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
696                Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
697            );
698
699            (
700                Some(Arc::new(RwLock::new(dht_instance))),
701                Some(Arc::new(dashboard)),
702            )
703        } else {
704            (None, None)
705        };
706
707        // MCP removed
708
709        // Initialize production resource manager if configured
710        let resource_manager = config
711            .production_config
712            .clone()
713            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
714
715        // Initialize bootstrap cache manager
716        let diversity_config = config.diversity_config.clone().unwrap_or_default();
717        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
718            match BootstrapManager::with_full_config(
719                cache_config.clone(),
720                crate::rate_limit::JoinRateLimiterConfig::default(),
721                diversity_config.clone(),
722            )
723            .await
724            {
725                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
726                Err(e) => {
727                    warn!(
728                        "Failed to initialize bootstrap manager: {}, continuing without cache",
729                        e
730                    );
731                    None
732                }
733            }
734        } else {
735            match BootstrapManager::with_full_config(
736                crate::bootstrap::CacheConfig::default(),
737                crate::rate_limit::JoinRateLimiterConfig::default(),
738                diversity_config,
739            )
740            .await
741            {
742                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
743                Err(e) => {
744                    warn!(
745                        "Failed to initialize bootstrap manager: {}, continuing without cache",
746                        e
747                    );
748                    None
749                }
750            }
751        };
752
753        // Initialize dual-stack ant-quic nodes
754        // Determine bind addresses
755        let (v6_opt, v4_opt) = {
756            let port = config.listen_addr.port();
757            let ip = config.listen_addr.ip();
758
759            let v4_addr = if ip.is_ipv4() {
760                Some(std::net::SocketAddr::new(ip, port))
761            } else {
762                // If config is IPv6, we still might want IPv4 on UNSPECIFIED if dual stack is desired
763                // But for now let's just stick to defaults if not specified
764                Some(std::net::SocketAddr::new(
765                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
766                    port,
767                ))
768            };
769
770            let v6_addr = if config.enable_ipv6 {
771                if ip.is_ipv6() {
772                    Some(std::net::SocketAddr::new(ip, port))
773                } else {
774                    Some(std::net::SocketAddr::new(
775                        std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
776                        port,
777                    ))
778                }
779            } else {
780                None
781            };
782            (v6_addr, v4_addr)
783        };
784
785        let dual_node = Arc::new(
786            DualStackNetworkNode::new(v6_opt, v4_opt)
787                .await
788                .map_err(|e| {
789                    P2PError::Transport(crate::error::TransportError::SetupFailed(
790                        format!("Failed to create dual-stack network nodes: {}", e).into(),
791                    ))
792                })?,
793        );
794
795        // Initialize rate limiter with default config
796        let rate_limiter = Arc::new(RateLimiter::new(
797            crate::validation::RateLimitConfig::default(),
798        ));
799
800        // Create active connections tracker
801        let active_connections = Arc::new(RwLock::new(HashSet::new()));
802
803        // Initialize GeoIP provider
804        let geo_provider = Arc::new(BgpGeoProvider::new());
805
806        // Create peers map
807        let peers = Arc::new(RwLock::new(HashMap::new()));
808
809        // Start connection lifecycle monitor
810        // CRITICAL: Subscribe to connection events BEFORE spawning the task
811        // to avoid race condition where early connections are missed
812        let connection_event_rx = dual_node.subscribe_connection_events();
813
814        let connection_monitor_handle = {
815            let active_conns = Arc::clone(&active_connections);
816            let peers_map = Arc::clone(&peers);
817            let event_tx_clone = event_tx.clone();
818            let dual_node_clone = Arc::clone(&dual_node);
819            let geo_provider_clone = Arc::clone(&geo_provider);
820            let peer_id_clone = peer_id.clone();
821
822            let handle = tokio::spawn(async move {
823                Self::connection_lifecycle_monitor_with_rx(
824                    dual_node_clone,
825                    connection_event_rx,
826                    active_conns,
827                    peers_map,
828                    event_tx_clone,
829                    geo_provider_clone,
830                    peer_id_clone,
831                )
832                .await;
833            });
834
835            Arc::new(RwLock::new(Some(handle)))
836        };
837
838        // Spawn keepalive task
839        let shutdown = Arc::new(AtomicBool::new(false));
840        let keepalive_handle = {
841            let active_conns = Arc::clone(&active_connections);
842            let dual_node_clone = Arc::clone(&dual_node);
843            let shutdown_clone = Arc::clone(&shutdown);
844
845            let handle = tokio::spawn(async move {
846                Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
847            });
848
849            Arc::new(RwLock::new(Some(handle)))
850        };
851
852        // Compute binary hash for attestation (in production, this would be the actual binary)
853        // For now, we use a placeholder that will be replaced during node initialization
854        let binary_hash = Self::compute_binary_hash();
855
856        let node = Self {
857            config,
858            peer_id,
859            peers,
860            event_tx,
861            listen_addrs: RwLock::new(Vec::new()),
862            start_time: Instant::now(),
863            running: RwLock::new(false),
864            dht,
865            resource_manager,
866            bootstrap_manager,
867            dual_node,
868            rate_limiter,
869            active_connections,
870            security_dashboard,
871            connection_monitor_handle,
872            keepalive_handle,
873            shutdown,
874            geo_provider,
875            // Attestation - EntangledId will be derived later when NodeIdentity is available
876            entangled_id: None,
877            binary_hash,
878        };
879        info!("Created P2P node with peer ID: {}", node.peer_id);
880
881        // Start the network listeners to populate listen addresses
882        node.start_network_listeners().await?;
883
884        // Update the connection monitor with actual peers reference
885        node.start_connection_monitor().await;
886
887        Ok(node)
888    }
889
890    /// Create a new node builder
891    pub fn builder() -> NodeBuilder {
892        NodeBuilder::new()
893    }
894
895    /// Get the peer ID of this node
896    pub fn peer_id(&self) -> &PeerId {
897        &self.peer_id
898    }
899
900    pub fn local_addr(&self) -> Option<String> {
901        self.listen_addrs
902            .try_read()
903            .ok()
904            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
905    }
906
907    pub async fn subscribe(&self, topic: &str) -> Result<()> {
908        // In a real implementation, this would register the topic with the pubsub mechanism.
909        // For now, we just log it.
910        info!("Subscribed to topic: {}", topic);
911        Ok(())
912    }
913
914    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
915        info!(
916            "Publishing message to topic: {} ({} bytes)",
917            topic,
918            data.len()
919        );
920
921        // Get list of connected peers
922        let peer_list: Vec<PeerId> = {
923            let peers_guard = self.peers.read().await;
924            peers_guard.keys().cloned().collect()
925        };
926
927        if peer_list.is_empty() {
928            debug!("No peers connected, message will only be sent to local subscribers");
929        } else {
930            // Send message to all connected peers
931            let mut send_count = 0;
932            for peer_id in &peer_list {
933                match self.send_message(peer_id, topic, data.to_vec()).await {
934                    Ok(_) => {
935                        send_count += 1;
936                        debug!("Sent message to peer: {}", peer_id);
937                    }
938                    Err(e) => {
939                        warn!("Failed to send message to peer {}: {}", peer_id, e);
940                    }
941                }
942            }
943            info!(
944                "Published message to {}/{} connected peers",
945                send_count,
946                peer_list.len()
947            );
948        }
949
950        // Also send to local subscribers (for local echo and testing)
951        let event = P2PEvent::Message {
952            topic: topic.to_string(),
953            source: self.peer_id.clone(),
954            data: data.to_vec(),
955        };
956        let _ = self.event_tx.send(event);
957
958        Ok(())
959    }
960
961    /// Get the node configuration
962    pub fn config(&self) -> &NodeConfig {
963        &self.config
964    }
965
966    /// Start the P2P node
967    pub async fn start(&self) -> Result<()> {
968        info!("Starting P2P node...");
969
970        // Start production resource manager if configured
971        if let Some(ref resource_manager) = self.resource_manager {
972            resource_manager.start().await.map_err(|e| {
973                P2PError::Network(crate::error::NetworkError::ProtocolError(
974                    format!("Failed to start resource manager: {e}").into(),
975                ))
976            })?;
977            info!("Production resource manager started");
978        }
979
980        // Start bootstrap manager background tasks
981        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
982            let mut manager = bootstrap_manager.write().await;
983            manager.start_background_tasks().await.map_err(|e| {
984                P2PError::Network(crate::error::NetworkError::ProtocolError(
985                    format!("Failed to start bootstrap manager: {e}").into(),
986                ))
987            })?;
988            info!("Bootstrap cache manager started");
989        }
990
991        // Set running state
992        *self.running.write().await = true;
993
994        // Start listening on configured addresses using transport layer
995        self.start_network_listeners().await?;
996
997        // Log current listen addresses
998        let listen_addrs = self.listen_addrs.read().await;
999        info!("P2P node started on addresses: {:?}", *listen_addrs);
1000
1001        // MCP removed
1002
1003        // Start message receiving system
1004        self.start_message_receiving_system().await?;
1005
1006        // Connect to bootstrap peers
1007        self.connect_bootstrap_peers().await?;
1008
1009        Ok(())
1010    }
1011
1012    /// Start network listeners on configured addresses
1013    async fn start_network_listeners(&self) -> Result<()> {
1014        info!("Starting dual-stack listeners (ant-quic)...");
1015        // Update our listen_addrs from the dual node bindings
1016        let addrs = self.dual_node.local_addrs().await.map_err(|e| {
1017            P2PError::Transport(crate::error::TransportError::SetupFailed(
1018                format!("Failed to get local addresses: {}", e).into(),
1019            ))
1020        })?;
1021        {
1022            let mut la = self.listen_addrs.write().await;
1023            *la = addrs.clone();
1024        }
1025
1026        // Spawn a background accept loop that handles incoming connections from either stack
1027        let event_tx = self.event_tx.clone();
1028        let peers = self.peers.clone();
1029        let active_connections = self.active_connections.clone();
1030        let rate_limiter = self.rate_limiter.clone();
1031        let dual = self.dual_node.clone();
1032        tokio::spawn(async move {
1033            loop {
1034                match dual.accept_any().await {
1035                    Ok((ant_peer_id, remote_sock)) => {
1036                        let peer_id =
1037                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
1038                        let remote_addr = NetworkAddress::from(remote_sock);
1039                        // Optional: basic IP rate limiting
1040                        let _ = rate_limiter.check_ip(&remote_sock.ip());
1041                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1042                        register_new_peer(&peers, &peer_id, &remote_addr).await;
1043                        active_connections.write().await.insert(peer_id);
1044                    }
1045                    Err(e) => {
1046                        warn!("Accept failed: {}", e);
1047                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1048                    }
1049                }
1050            }
1051        });
1052
1053        info!("Dual-stack listeners active on: {:?}", addrs);
1054        Ok(())
1055    }
1056
1057    /// Start a listener on a specific socket address
1058    #[allow(dead_code)]
1059    async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1060        // use crate::transport::{Transport}; // Unused during migration
1061
1062        // DISABLED during ant-quic migration - TODO: Reimplement using AntQuicAdapter
1063        /*
1064        // Try QUIC first (preferred transport)
1065        match crate::transport::QuicTransport::new(Default::default()) {
1066            Ok(quic_transport) => {
1067                match quic_transport.listen(NetworkAddress::new(addr)).await {
1068                    Ok(listen_addrs) => {
1069                        info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
1070
1071                        // Store the actual listening addresses in the node
1072                        {
1073                            let mut node_listen_addrs = self.listen_addrs.write().await;
1074                            // Don't clear - accumulate addresses from multiple listeners
1075                            node_listen_addrs.push(listen_addrs.socket_addr());
1076                        }
1077
1078                        // Start accepting connections in background
1079                        self.start_connection_acceptor(
1080                            Arc::new(quic_transport),
1081                            addr,
1082                            crate::transport::TransportType::QUIC
1083                        ).await?;
1084
1085                        return Ok(());
1086                    }
1087                    Err(e) => {
1088                        warn!("Failed to start QUIC listener on {}: {}", addr, e);
1089                    }
1090                }
1091            }
1092            Err(e) => {
1093                warn!("Failed to create QUIC transport for listening: {}", e);
1094            }
1095        }
1096        */
1097
1098        warn!("QUIC transport temporarily disabled during ant-quic migration");
1099        // No TCP fallback - QUIC only
1100        Err(crate::P2PError::Transport(
1101            crate::error::TransportError::SetupFailed(
1102                format!(
1103                    "Failed to start QUIC listener on {addr} - transport disabled during migration"
1104                )
1105                .into(),
1106            ),
1107        ))
1108    }
1109
1110    /// Start connection acceptor background task
1111    #[allow(dead_code)] // Deprecated during ant-quic migration
1112    async fn start_connection_acceptor(
1113        &self,
1114        transport: Arc<dyn crate::transport::Transport>,
1115        addr: std::net::SocketAddr,
1116        transport_type: crate::transport::TransportType,
1117    ) -> Result<()> {
1118        info!(
1119            "Starting connection acceptor for {:?} on {}",
1120            transport_type, addr
1121        );
1122
1123        // Clone necessary data for the background task
1124        let event_tx = self.event_tx.clone();
1125        let _peer_id = self.peer_id.clone();
1126        let peers = Arc::clone(&self.peers);
1127        // ant-quic dual-stack node is managed separately; accept loop started in start_network_listeners
1128
1129        let rate_limiter = Arc::clone(&self.rate_limiter);
1130
1131        // Spawn background task to accept incoming connections
1132        tokio::spawn(async move {
1133            loop {
1134                match transport.accept().await {
1135                    Ok(connection) => {
1136                        let remote_addr = connection.remote_addr();
1137                        let connection_peer_id =
1138                            format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1139
1140                        // Apply rate limiting for incoming connections
1141                        let socket_addr = remote_addr.socket_addr();
1142                        if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1143                            // Connection dropped automatically when it goes out of scope
1144                            continue;
1145                        }
1146
1147                        info!(
1148                            "Accepted {:?} connection from {} (peer: {})",
1149                            transport_type, remote_addr, connection_peer_id
1150                        );
1151
1152                        // Generate peer connected event
1153                        let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1154
1155                        // Store the peer connection
1156                        register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1157
1158                        // Spawn task to handle this specific connection's messages
1159                        spawn_connection_handler(
1160                            connection,
1161                            connection_peer_id,
1162                            event_tx.clone(),
1163                            Arc::clone(&peers),
1164                        );
1165                    }
1166                    Err(e) => {
1167                        warn!(
1168                            "Failed to accept {:?} connection on {}: {}",
1169                            transport_type, addr, e
1170                        );
1171
1172                        // Brief pause before retrying to avoid busy loop
1173                        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1174                    }
1175                }
1176            }
1177        });
1178
1179        info!(
1180            "Connection acceptor background task started for {:?} on {}",
1181            transport_type, addr
1182        );
1183        Ok(())
1184    }
1185
1186    /// Start the message receiving system with background tasks
1187    async fn start_message_receiving_system(&self) -> Result<()> {
1188        info!("Starting message receiving system");
1189        let dual = self.dual_node.clone();
1190        let event_tx = self.event_tx.clone();
1191
1192        tokio::spawn(async move {
1193            loop {
1194                match dual.receive_any().await {
1195                    Ok((_peer_id, bytes)) => {
1196                        // Expect the JSON message wrapper from create_protocol_message
1197                        #[allow(clippy::collapsible_if)]
1198                        if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1199                            if let (Some(protocol), Some(data), Some(from)) = (
1200                                value.get("protocol").and_then(|v| v.as_str()),
1201                                value.get("data").and_then(|v| v.as_array()),
1202                                value.get("from").and_then(|v| v.as_str()),
1203                            ) {
1204                                let payload: Vec<u8> = data
1205                                    .iter()
1206                                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1207                                    .collect();
1208                                let _ = event_tx.send(P2PEvent::Message {
1209                                    topic: protocol.to_string(),
1210                                    source: from.to_string(),
1211                                    data: payload,
1212                                });
1213                            }
1214                        }
1215                    }
1216                    Err(e) => {
1217                        warn!("Receive error: {}", e);
1218                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1219                    }
1220                }
1221            }
1222        });
1223
1224        Ok(())
1225    }
1226
1227    /// Handle a received message and generate appropriate events
1228    #[allow(dead_code)]
1229    async fn handle_received_message(
1230        &self,
1231        message_data: Vec<u8>,
1232        peer_id: &PeerId,
1233        _protocol: &str,
1234        event_tx: &broadcast::Sender<P2PEvent>,
1235    ) -> Result<()> {
1236        // MCP removed: no special protocol handling
1237
1238        // Parse the message format we created in create_protocol_message
1239        match serde_json::from_slice::<serde_json::Value>(&message_data) {
1240            Ok(message) => {
1241                if let (Some(protocol), Some(data), Some(from)) = (
1242                    message.get("protocol").and_then(|v| v.as_str()),
1243                    message.get("data").and_then(|v| v.as_array()),
1244                    message.get("from").and_then(|v| v.as_str()),
1245                ) {
1246                    // Convert data array back to bytes
1247                    let data_bytes: Vec<u8> = data
1248                        .iter()
1249                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1250                        .collect();
1251
1252                    // Generate message event
1253                    let event = P2PEvent::Message {
1254                        topic: protocol.to_string(),
1255                        source: from.to_string(),
1256                        data: data_bytes,
1257                    };
1258
1259                    let _ = event_tx.send(event);
1260                    debug!("Generated message event from peer: {}", peer_id);
1261                }
1262            }
1263            Err(e) => {
1264                warn!("Failed to parse received message from {}: {}", peer_id, e);
1265            }
1266        }
1267
1268        Ok(())
1269    }
1270
1271    // MCP removed
1272
1273    // MCP removed
1274
1275    /// Run the P2P node (blocks until shutdown)
1276    pub async fn run(&self) -> Result<()> {
1277        if !*self.running.read().await {
1278            self.start().await?;
1279        }
1280
1281        info!("P2P node running...");
1282
1283        // Main event loop
1284        loop {
1285            if !*self.running.read().await {
1286                break;
1287            }
1288
1289            // Perform periodic tasks
1290            self.periodic_tasks().await?;
1291
1292            // Sleep for a short interval
1293            tokio::time::sleep(Duration::from_millis(100)).await;
1294        }
1295
1296        info!("P2P node stopped");
1297        Ok(())
1298    }
1299
1300    /// Stop the P2P node
1301    pub async fn stop(&self) -> Result<()> {
1302        info!("Stopping P2P node...");
1303
1304        // Set running state to false
1305        *self.running.write().await = false;
1306
1307        // Disconnect all peers
1308        self.disconnect_all_peers().await?;
1309
1310        // Shutdown production resource manager if configured
1311        if let Some(ref resource_manager) = self.resource_manager {
1312            resource_manager.shutdown().await.map_err(|e| {
1313                P2PError::Network(crate::error::NetworkError::ProtocolError(
1314                    format!("Failed to shutdown resource manager: {e}").into(),
1315                ))
1316            })?;
1317            info!("Production resource manager stopped");
1318        }
1319
1320        info!("P2P node stopped");
1321        Ok(())
1322    }
1323
1324    /// Graceful shutdown alias for tests
1325    pub async fn shutdown(&self) -> Result<()> {
1326        self.stop().await
1327    }
1328
1329    /// Check if the node is running
1330    pub async fn is_running(&self) -> bool {
1331        *self.running.read().await
1332    }
1333
1334    /// Get the current listen addresses
1335    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1336        self.listen_addrs.read().await.clone()
1337    }
1338
1339    /// Get connected peers
1340    pub async fn connected_peers(&self) -> Vec<PeerId> {
1341        // "Connected" is defined as currently active at the transport layer.
1342        // The peers map may contain historical peers with Disconnected/Failed status.
1343        self.active_connections
1344            .read()
1345            .await
1346            .iter()
1347            .cloned()
1348            .collect()
1349    }
1350
1351    /// Get peer count
1352    pub async fn peer_count(&self) -> usize {
1353        self.active_connections.read().await.len()
1354    }
1355
1356    /// Get peer info
1357    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1358        self.peers.read().await.get(peer_id).cloned()
1359    }
1360
1361    /// Get the peer ID for a given socket address, if connected
1362    ///
1363    /// This method searches through all connected peers to find one that has
1364    /// the specified address in its address list.
1365    ///
1366    /// # Arguments
1367    /// * `addr` - The socket address to search for (e.g., "192.168.1.100:9000")
1368    ///
1369    /// # Returns
1370    /// * `Some(PeerId)` - The peer ID if a matching connected peer is found
1371    /// * `None` - If no peer with this address is currently connected
1372    pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1373        // Parse the address to a SocketAddr for comparison
1374        let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1375
1376        let peers = self.peers.read().await;
1377
1378        // Search through all connected peers
1379        for (peer_id, peer_info) in peers.iter() {
1380            // Check if this peer has a matching address
1381            for peer_addr in &peer_info.addresses {
1382                if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1383                    && peer_socket == socket_addr
1384                {
1385                    return Some(peer_id.clone());
1386                }
1387            }
1388        }
1389
1390        None
1391    }
1392
1393    /// List all active connections with their peer IDs and addresses
1394    ///
1395    /// # Returns
1396    /// A vector of tuples containing (PeerId, Vec<String>) where the Vec<String>
1397    /// contains all known addresses for that peer.
1398    pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1399        let active = self.active_connections.read().await;
1400        let peers = self.peers.read().await;
1401
1402        active
1403            .iter()
1404            .map(|peer_id| {
1405                let addresses = peers
1406                    .get(peer_id)
1407                    .map(|info| info.addresses.clone())
1408                    .unwrap_or_default();
1409                (peer_id.clone(), addresses)
1410            })
1411            .collect()
1412    }
1413
1414    /// Remove a peer from the peers map
1415    ///
1416    /// This method removes a peer from the internal peers map. It should be used
1417    /// when a connection is no longer valid (e.g., after detecting that the underlying
1418    /// ant-quic connection has closed).
1419    ///
1420    /// # Arguments
1421    /// * `peer_id` - The ID of the peer to remove
1422    ///
1423    /// # Returns
1424    /// `true` if the peer was found and removed, `false` if the peer was not in the map
1425    pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1426        // Remove from active connections tracking
1427        self.active_connections.write().await.remove(peer_id);
1428        // Remove from peers map and return whether it existed
1429        self.peers.write().await.remove(peer_id).is_some()
1430    }
1431
1432    /// Check if a peer is connected
1433    ///
1434    /// This method checks if the peer ID exists in the peers map. Note that this
1435    /// only verifies the peer is registered - it does not guarantee the underlying
1436    /// ant-quic connection is still active. For connection validation, use `send_message`
1437    /// which will fail if the connection is closed.
1438    ///
1439    /// # Arguments
1440    /// * `peer_id` - The ID of the peer to check
1441    ///
1442    /// # Returns
1443    /// `true` if the peer exists in the peers map, `false` otherwise
1444    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1445        self.peers.read().await.contains_key(peer_id)
1446    }
1447
1448    /// Connect to a peer
1449    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1450        info!("Connecting to peer at: {}", address);
1451
1452        // Check production limits if resource manager is enabled
1453        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1454            Some(resource_manager.acquire_connection().await?)
1455        } else {
1456            None
1457        };
1458
1459        // Parse the address to SocketAddr format
1460        let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1461            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1462                format!("{}: {}", address, e).into(),
1463            ))
1464        })?;
1465
1466        // Normalize wildcard addresses to loopback for local connections
1467        // This converts [::]:port → ::1:port and 0.0.0.0:port → 127.0.0.1:port
1468        let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1469        if normalized_addr != socket_addr {
1470            info!(
1471                "Normalized wildcard address {} to loopback {}",
1472                socket_addr, normalized_addr
1473            );
1474        }
1475
1476        // Establish a real connection via dual-stack Happy Eyeballs, but cap the wait
1477        let addr_list = vec![normalized_addr];
1478        let peer_id = match tokio::time::timeout(
1479            self.config.connection_timeout,
1480            self.dual_node.connect_happy_eyeballs(&addr_list),
1481        )
1482        .await
1483        {
1484            Ok(Ok(peer)) => {
1485                let connected_peer_id =
1486                    crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1487                info!("Successfully connected to peer: {}", connected_peer_id);
1488                connected_peer_id
1489            }
1490            Ok(Err(e)) => {
1491                warn!("Failed to connect to peer at {}: {}", address, e);
1492                return Err(P2PError::Transport(
1493                    crate::error::TransportError::ConnectionFailed {
1494                        addr: normalized_addr,
1495                        reason: e.to_string().into(),
1496                    },
1497                ));
1498            }
1499            Err(_) => {
1500                warn!(
1501                    "Timed out connecting to peer at {} after {:?}",
1502                    address, self.config.connection_timeout
1503                );
1504                return Err(P2PError::Timeout(self.config.connection_timeout));
1505            }
1506        };
1507
1508        // Create peer info with connection details
1509        let peer_info = PeerInfo {
1510            peer_id: peer_id.clone(),
1511            addresses: vec![address.to_string()],
1512            connected_at: Instant::now(),
1513            last_seen: Instant::now(),
1514            status: ConnectionStatus::Connected,
1515            protocols: vec!["p2p-foundation/1.0".to_string()],
1516            heartbeat_count: 0,
1517        };
1518
1519        // Store peer information
1520        self.peers.write().await.insert(peer_id.clone(), peer_info);
1521
1522        // Add to active connections tracking
1523        // This is critical for is_connection_active() to work correctly
1524        self.active_connections
1525            .write()
1526            .await
1527            .insert(peer_id.clone());
1528
1529        // Record bandwidth usage if resource manager is enabled
1530        if let Some(ref resource_manager) = self.resource_manager {
1531            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1532        }
1533
1534        // Emit connection event
1535        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1536
1537        info!("Connected to peer: {}", peer_id);
1538        Ok(peer_id)
1539    }
1540
1541    /// Disconnect from a peer
1542    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1543        info!("Disconnecting from peer: {}", peer_id);
1544
1545        // Remove from active connections
1546        self.active_connections.write().await.remove(peer_id);
1547
1548        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1549            peer_info.status = ConnectionStatus::Disconnected;
1550
1551            // Emit event
1552            let _ = self
1553                .event_tx
1554                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1555
1556            info!("Disconnected from peer: {}", peer_id);
1557        }
1558
1559        Ok(())
1560    }
1561
1562    /// Check if a connection to a peer is active
1563    pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1564        self.active_connections.read().await.contains(peer_id)
1565    }
1566
1567    /// Send a message to a peer
1568    pub async fn send_message(
1569        &self,
1570        peer_id: &PeerId,
1571        protocol: &str,
1572        data: Vec<u8>,
1573    ) -> Result<()> {
1574        debug!(
1575            "Sending message to peer {} on protocol {}",
1576            peer_id, protocol
1577        );
1578
1579        // Check rate limits if resource manager is enabled
1580        if let Some(ref resource_manager) = self.resource_manager
1581            && !resource_manager
1582                .check_rate_limit(peer_id, "message")
1583                .await?
1584        {
1585            return Err(P2PError::ResourceExhausted(
1586                format!("Rate limit exceeded for peer {}", peer_id).into(),
1587            ));
1588        }
1589
1590        // Check if peer exists in peers map
1591        if !self.peers.read().await.contains_key(peer_id) {
1592            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1593                peer_id.to_string().into(),
1594            )));
1595        }
1596
1597        // **NEW**: Check if the ant-quic connection is actually active
1598        // This is the critical fix for the connection state synchronization issue
1599        if !self.is_connection_active(peer_id).await {
1600            debug!(
1601                "Connection to peer {} exists in peers map but ant-quic connection is closed",
1602                peer_id
1603            );
1604
1605            // Clean up stale peer entry
1606            self.remove_peer(peer_id).await;
1607
1608            return Err(P2PError::Network(
1609                crate::error::NetworkError::ConnectionClosed {
1610                    peer_id: peer_id.to_string().into(),
1611                },
1612            ));
1613        }
1614
1615        // MCP removed: no special-case protocol validation
1616
1617        // Record bandwidth usage if resource manager is enabled
1618        if let Some(ref resource_manager) = self.resource_manager {
1619            resource_manager.record_bandwidth(data.len() as u64, 0);
1620        }
1621
1622        // Create protocol message wrapper
1623        let _message_data = self.create_protocol_message(protocol, data)?;
1624
1625        // Send via ant-quic dual-node
1626        let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1627        tokio::time::timeout(self.config.connection_timeout, send_fut)
1628            .await
1629            .map_err(|_| {
1630                P2PError::Transport(crate::error::TransportError::StreamError(
1631                    "Timed out sending message".into(),
1632                ))
1633            })?
1634            .map_err(|e| {
1635                P2PError::Transport(crate::error::TransportError::StreamError(
1636                    e.to_string().into(),
1637                ))
1638            })
1639    }
1640
1641    /// Create a protocol message wrapper
1642    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1643        use serde_json::json;
1644
1645        let timestamp = std::time::SystemTime::now()
1646            .duration_since(std::time::UNIX_EPOCH)
1647            .map_err(|e| {
1648                P2PError::Network(NetworkError::ProtocolError(
1649                    format!("System time error: {}", e).into(),
1650                ))
1651            })?
1652            .as_secs();
1653
1654        // Create a simple message format for P2P communication
1655        let message = json!({
1656            "protocol": protocol,
1657            "data": data,
1658            "from": self.peer_id,
1659            "timestamp": timestamp
1660        });
1661
1662        serde_json::to_vec(&message).map_err(|e| {
1663            P2PError::Transport(crate::error::TransportError::StreamError(
1664                format!("Failed to serialize message: {e}").into(),
1665            ))
1666        })
1667    }
1668
1669    // Note: async listen_addrs() already exists above for fetching listen addresses
1670}
1671
1672/// Create a protocol message wrapper (static version for background tasks)
1673#[allow(dead_code)]
1674fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1675    use serde_json::json;
1676
1677    let timestamp = std::time::SystemTime::now()
1678        .duration_since(std::time::UNIX_EPOCH)
1679        .map_err(|e| {
1680            P2PError::Network(NetworkError::ProtocolError(
1681                format!("System time error: {}", e).into(),
1682            ))
1683        })?
1684        .as_secs();
1685
1686    // Create a simple message format for P2P communication
1687    let message = json!({
1688        "protocol": protocol,
1689        "data": data,
1690        "timestamp": timestamp
1691    });
1692
1693    serde_json::to_vec(&message).map_err(|e| {
1694        P2PError::Transport(crate::error::TransportError::StreamError(
1695            format!("Failed to serialize message: {e}").into(),
1696        ))
1697    })
1698}
1699
1700impl P2PNode {
1701    /// Subscribe to network events
1702    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1703        self.event_tx.subscribe()
1704    }
1705
1706    /// Backwards-compat event stream accessor for tests
1707    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1708        self.subscribe_events()
1709    }
1710
1711    /// Get node uptime
1712    pub fn uptime(&self) -> Duration {
1713        self.start_time.elapsed()
1714    }
1715
1716    // =========================================================================
1717    // Attestation Methods (Phase 1: Soft Enforcement)
1718    // =========================================================================
1719
1720    /// Compute the BLAKE3 hash of the running binary.
1721    ///
1722    /// In production, this reads the actual executable file and hashes it.
1723    /// Returns a placeholder hash if the binary cannot be read.
1724    fn compute_binary_hash() -> [u8; 32] {
1725        // Try to get the path to the current executable and hash it
1726        if let Some(hash) = std::env::current_exe()
1727            .ok()
1728            .and_then(|exe_path| std::fs::read(&exe_path).ok())
1729            .map(|binary_data| blake3::hash(&binary_data))
1730        {
1731            return *hash.as_bytes();
1732        }
1733        // Fallback: return a deterministic placeholder based on compile-time info
1734        // This allows tests and development to work without actual binary hashing
1735        let placeholder = format!(
1736            "saorsa-core-v{}-{}",
1737            env!("CARGO_PKG_VERSION"),
1738            std::env::consts::ARCH
1739        );
1740        let hash = blake3::hash(placeholder.as_bytes());
1741        *hash.as_bytes()
1742    }
1743
1744    /// Get this node's binary hash used for attestation.
1745    #[must_use]
1746    pub fn binary_hash(&self) -> &[u8; 32] {
1747        &self.binary_hash
1748    }
1749
1750    /// Get this node's entangled identity, if set.
1751    #[must_use]
1752    pub fn entangled_id(&self) -> Option<&crate::attestation::EntangledId> {
1753        self.entangled_id.as_ref()
1754    }
1755
1756    /// Set the entangled identity for this node.
1757    ///
1758    /// This should be called after the node's cryptographic identity is established,
1759    /// typically by deriving from the NodeIdentity's public key.
1760    pub fn set_entangled_id(&mut self, entangled_id: crate::attestation::EntangledId) {
1761        self.entangled_id = Some(entangled_id);
1762    }
1763
1764    /// Verify a peer's attestation and return the enforcement decision.
1765    ///
1766    /// This function implements the Entangled Attestation verification protocol
1767    /// (Phase 6: Hard Enforcement). Based on the configured enforcement mode:
1768    ///
1769    /// - **Off**: Skips verification entirely
1770    /// - **Soft**: Logs warnings but allows connections
1771    /// - **Hard**: Rejects connections with invalid attestations
1772    ///
1773    /// # Arguments
1774    /// * `peer_id` - The peer's identifier for logging
1775    /// * `peer_entangled_id` - The peer's claimed entangled ID
1776    /// * `peer_public_key` - The peer's ML-DSA public key
1777    ///
1778    /// # Returns
1779    /// An [`EnforcementDecision`] indicating whether to allow or reject the connection.
1780    ///
1781    /// # Example
1782    /// ```rust,ignore
1783    /// let decision = node.verify_peer_attestation(peer_id, &entangled_id, &public_key);
1784    /// if decision.should_reject() {
1785    ///     // Send rejection message and close connection
1786    ///     if let Some(rejection) = decision.rejection() {
1787    ///         send_rejection(peer_id, rejection);
1788    ///     }
1789    ///     disconnect(peer_id);
1790    /// }
1791    /// ```
1792    pub fn verify_peer_attestation(
1793        &self,
1794        peer_id: &str,
1795        peer_entangled_id: &crate::attestation::EntangledId,
1796        peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
1797    ) -> crate::attestation::EnforcementDecision {
1798        use crate::attestation::{
1799            AttestationRejection, AttestationRejectionReason, EnforcementDecision, EnforcementMode,
1800        };
1801
1802        let config = &self.config.attestation_config;
1803
1804        // Skip verification if attestation is disabled
1805        if !config.enabled {
1806            return EnforcementDecision::Skipped;
1807        }
1808
1809        // Verify the entangled ID derivation
1810        let id_valid = peer_entangled_id.verify(peer_public_key);
1811
1812        // Check binary hash allowlist (if configured)
1813        let binary_hash = *peer_entangled_id.binary_hash();
1814        let binary_allowed = config.is_binary_allowed(&binary_hash);
1815
1816        match config.enforcement_mode {
1817            EnforcementMode::Off => EnforcementDecision::Skipped,
1818
1819            EnforcementMode::Soft => {
1820                // Soft enforcement: log warnings but allow connections
1821                if !id_valid {
1822                    warn!(
1823                        peer = %peer_id,
1824                        binary_hash = %hex::encode(&binary_hash[..8]),
1825                        "Peer attestation verification failed: Invalid entangled ID (soft mode - allowing)"
1826                    );
1827                    return EnforcementDecision::AllowWithWarning {
1828                        reason: AttestationRejectionReason::IdentityMismatch,
1829                    };
1830                }
1831                if !binary_allowed {
1832                    warn!(
1833                        peer = %peer_id,
1834                        binary_hash = %hex::encode(binary_hash),
1835                        "Peer attestation verification failed: Binary not in allowlist (soft mode - allowing)"
1836                    );
1837                    return EnforcementDecision::AllowWithWarning {
1838                        reason: AttestationRejectionReason::BinaryNotAllowed { hash: binary_hash },
1839                    };
1840                }
1841                EnforcementDecision::Allow
1842            }
1843
1844            EnforcementMode::Hard => {
1845                // Hard enforcement: reject invalid attestations
1846                if !id_valid {
1847                    error!(
1848                        peer = %peer_id,
1849                        binary_hash = %hex::encode(&binary_hash[..8]),
1850                        "REJECTING peer: Invalid entangled ID derivation"
1851                    );
1852                    return EnforcementDecision::Reject {
1853                        rejection: AttestationRejection::identity_mismatch(),
1854                    };
1855                }
1856                if !binary_allowed {
1857                    error!(
1858                        peer = %peer_id,
1859                        binary_hash = %hex::encode(binary_hash),
1860                        "REJECTING peer: Binary not in allowlist"
1861                    );
1862                    return EnforcementDecision::Reject {
1863                        rejection: AttestationRejection::binary_not_allowed(binary_hash),
1864                    };
1865                }
1866
1867                info!(
1868                    peer = %peer_id,
1869                    entangled_id = %hex::encode(&peer_entangled_id.id()[..8]),
1870                    "Peer attestation verified successfully (hard mode)"
1871                );
1872                EnforcementDecision::Allow
1873            }
1874        }
1875    }
1876
1877    /// Verify a peer's attestation and return a simple boolean result.
1878    ///
1879    /// This is a convenience method that wraps [`verify_peer_attestation`] for cases
1880    /// where only a pass/fail result is needed without the detailed decision.
1881    ///
1882    /// # Returns
1883    /// `true` if the connection should be allowed, `false` if it should be rejected.
1884    #[must_use]
1885    pub fn verify_peer_attestation_simple(
1886        &self,
1887        peer_id: &str,
1888        peer_entangled_id: &crate::attestation::EntangledId,
1889        peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
1890    ) -> bool {
1891        self.verify_peer_attestation(peer_id, peer_entangled_id, peer_public_key)
1892            .should_allow()
1893    }
1894
1895    // MCP removed: all MCP tool/service methods removed
1896
1897    // /// Handle MCP remote tool call with network integration
1898
1899    // /// List tools available on a specific remote peer
1900
1901    // /// Get MCP server statistics
1902
1903    /// Get production resource metrics
1904    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1905        if let Some(ref resource_manager) = self.resource_manager {
1906            Ok(resource_manager.get_metrics().await)
1907        } else {
1908            Err(P2PError::Network(
1909                crate::error::NetworkError::ProtocolError(
1910                    "Production resource manager not enabled".to_string().into(),
1911                ),
1912            ))
1913        }
1914    }
1915
1916    /// Connection lifecycle monitor task - processes ant-quic connection events
1917    /// and updates active_connections HashSet and peers map.
1918    ///
1919    /// This version accepts a pre-subscribed receiver to avoid the race condition
1920    /// where early connections could be missed if subscription happens after the task starts.
1921    #[allow(clippy::too_many_arguments)]
1922    async fn connection_lifecycle_monitor_with_rx(
1923        _dual_node: Arc<DualStackNetworkNode>,
1924        mut event_rx: broadcast::Receiver<crate::transport::ant_quic_adapter::ConnectionEvent>,
1925        active_connections: Arc<RwLock<HashSet<String>>>,
1926        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1927        event_tx: broadcast::Sender<P2PEvent>,
1928        geo_provider: Arc<BgpGeoProvider>,
1929        _local_peer_id: String,
1930    ) {
1931        use crate::transport::ant_quic_adapter::ConnectionEvent;
1932
1933        info!("Connection lifecycle monitor started (pre-subscribed receiver)");
1934
1935        loop {
1936            match event_rx.recv().await {
1937                Ok(event) => {
1938                    match event {
1939                        ConnectionEvent::Established {
1940                            peer_id,
1941                            remote_address,
1942                        } => {
1943                            let peer_id_str =
1944                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1945                            debug!(
1946                                "Connection established: peer={}, addr={}",
1947                                peer_id_str, remote_address
1948                            );
1949
1950                            // **GeoIP Validation**
1951                            let ip = remote_address.ip();
1952                            let is_rejected = match ip {
1953                                std::net::IpAddr::V4(v4) => {
1954                                    if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1955                                        geo_provider.is_hosting_asn(asn)
1956                                            || geo_provider.is_vpn_asn(asn)
1957                                    } else {
1958                                        false
1959                                    }
1960                                }
1961                                std::net::IpAddr::V6(v6) => {
1962                                    let info = geo_provider.lookup(v6);
1963                                    info.is_hosting_provider || info.is_vpn_provider
1964                                }
1965                            };
1966
1967                            if is_rejected {
1968                                info!(
1969                                    "Rejecting connection from {} ({}) due to GeoIP policy",
1970                                    peer_id_str, remote_address
1971                                );
1972                                continue;
1973                            }
1974
1975                            // Add to active connections
1976                            active_connections.write().await.insert(peer_id_str.clone());
1977
1978                            // Update peer info or insert new
1979                            let mut peers_lock = peers.write().await;
1980                            if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
1981                                peer_info.status = ConnectionStatus::Connected;
1982                                peer_info.connected_at = Instant::now();
1983                            } else {
1984                                debug!("Registering new incoming peer: {}", peer_id_str);
1985                                peers_lock.insert(
1986                                    peer_id_str.clone(),
1987                                    PeerInfo {
1988                                        peer_id: peer_id_str.clone(),
1989                                        addresses: vec![remote_address.to_string()],
1990                                        status: ConnectionStatus::Connected,
1991                                        last_seen: Instant::now(),
1992                                        connected_at: Instant::now(),
1993                                        protocols: Vec::new(),
1994                                        heartbeat_count: 0,
1995                                    },
1996                                );
1997                            }
1998
1999                            // Broadcast connection event
2000                            let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
2001                        }
2002                        ConnectionEvent::Lost { peer_id, reason } => {
2003                            let peer_id_str =
2004                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2005                            debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
2006
2007                            // Remove from active connections
2008                            active_connections.write().await.remove(&peer_id_str);
2009
2010                            // Update peer info status
2011                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2012                                peer_info.status = ConnectionStatus::Disconnected;
2013                                peer_info.last_seen = Instant::now();
2014                            }
2015
2016                            // Broadcast disconnection event
2017                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2018                        }
2019                        ConnectionEvent::Failed { peer_id, reason } => {
2020                            let peer_id_str =
2021                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2022                            debug!("Connection failed: peer={}, reason={}", peer_id_str, reason);
2023
2024                            // Remove from active connections
2025                            active_connections.write().await.remove(&peer_id_str);
2026
2027                            // Update peer info status
2028                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2029                                peer_info.status = ConnectionStatus::Disconnected;
2030                                peer_info.last_seen = Instant::now();
2031                            }
2032
2033                            // Broadcast disconnection event
2034                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2035                        }
2036                    }
2037                }
2038                Err(broadcast::error::RecvError::Lagged(skipped)) => {
2039                    warn!("Connection event receiver lagged, skipped {} events", skipped);
2040                }
2041                Err(broadcast::error::RecvError::Closed) => {
2042                    info!("Connection event channel closed, stopping lifecycle monitor");
2043                    break;
2044                }
2045            }
2046        }
2047    }
2048
2049    /// Connection lifecycle monitor task - processes ant-quic connection events
2050    /// and updates active_connections HashSet and peers map
2051    ///
2052    /// DEPRECATED: Use `connection_lifecycle_monitor_with_rx` instead to avoid race conditions
2053    #[allow(dead_code)]
2054    async fn connection_lifecycle_monitor(
2055        dual_node: Arc<DualStackNetworkNode>,
2056        active_connections: Arc<RwLock<HashSet<String>>>,
2057        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
2058        event_tx: broadcast::Sender<P2PEvent>,
2059        geo_provider: Arc<BgpGeoProvider>,
2060        local_peer_id: String,
2061    ) {
2062        use crate::transport::ant_quic_adapter::ConnectionEvent;
2063
2064        let mut event_rx = dual_node.subscribe_connection_events();
2065
2066        info!("Connection lifecycle monitor started");
2067
2068        loop {
2069            match event_rx.recv().await {
2070                Ok(event) => {
2071                    match event {
2072                        ConnectionEvent::Established {
2073                            peer_id,
2074                            remote_address,
2075                        } => {
2076                            let peer_id_str =
2077                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2078                            debug!(
2079                                "Connection established: peer={}, addr={}",
2080                                peer_id_str, remote_address
2081                            );
2082
2083                            // **GeoIP Validation**
2084                            // Check if the peer's IP is allowed
2085                            let ip = remote_address.ip();
2086                            let is_rejected = match ip {
2087                                std::net::IpAddr::V4(v4) => {
2088                                    // Check if it's a hosting provider or VPN
2089                                    if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
2090                                        geo_provider.is_hosting_asn(asn)
2091                                            || geo_provider.is_vpn_asn(asn)
2092                                    } else {
2093                                        false
2094                                    }
2095                                }
2096                                std::net::IpAddr::V6(v6) => {
2097                                    let info = geo_provider.lookup(v6);
2098                                    info.is_hosting_provider || info.is_vpn_provider
2099                                }
2100                            };
2101
2102                            if is_rejected {
2103                                info!(
2104                                    "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
2105                                    peer_id_str, remote_address
2106                                );
2107
2108                                // Create rejection message
2109                                let rejection = RejectionMessage {
2110                                    reason: RejectionReason::GeoIpPolicy,
2111                                    message:
2112                                        "Connection rejected: Hosting/VPN providers not allowed"
2113                                            .to_string(),
2114                                    suggested_target: None, // Could suggest a different region if we knew more
2115                                };
2116
2117                                // Serialize message
2118                                if let Ok(data) = serde_json::to_vec(&rejection) {
2119                                    // Create protocol message
2120                                    let timestamp = std::time::SystemTime::now()
2121                                        .duration_since(std::time::UNIX_EPOCH)
2122                                        .unwrap_or_default()
2123                                        .as_secs();
2124
2125                                    let message = serde_json::json!({
2126                                        "protocol": "control",
2127                                        "data": data,
2128                                        "from": local_peer_id,
2129                                        "timestamp": timestamp
2130                                    });
2131
2132                                    if let Ok(msg_bytes) = serde_json::to_vec(&message) {
2133                                        // Send rejection message
2134                                        // We use send_to_peer directly on dual_node to avoid the checks in P2PNode::send_message
2135                                        // which might fail if we haven't fully registered the peer yet
2136                                        let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
2137
2138                                        // Give it a moment to send before disconnecting?
2139                                        // ant-quic might handle this, but a small yield is safe
2140                                        tokio::task::yield_now().await;
2141                                    }
2142                                }
2143
2144                                // Disconnect (TODO: Add disconnect method to dual_node or just drop?)
2145                                // For now, we just don't add it to active connections, effectively ignoring it
2146                                // Ideally we should actively close the connection
2147                                continue;
2148                            }
2149
2150                            // Add to active connections
2151                            active_connections.write().await.insert(peer_id_str.clone());
2152
2153                            // Update peer info or insert new
2154                            let mut peers_lock = peers.write().await;
2155                            if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
2156                                peer_info.status = ConnectionStatus::Connected;
2157                                peer_info.connected_at = Instant::now();
2158                            } else {
2159                                // New incoming peer
2160                                debug!("Registering new incoming peer: {}", peer_id_str);
2161                                peers_lock.insert(
2162                                    peer_id_str.clone(),
2163                                    PeerInfo {
2164                                        peer_id: peer_id_str.clone(),
2165                                        addresses: vec![remote_address.to_string()],
2166                                        status: ConnectionStatus::Connected,
2167                                        last_seen: Instant::now(),
2168                                        connected_at: Instant::now(),
2169                                        protocols: Vec::new(),
2170                                        heartbeat_count: 0,
2171                                    },
2172                                );
2173                            }
2174
2175                            // Broadcast connection event
2176                            let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
2177                        }
2178                        ConnectionEvent::Lost { peer_id, reason } => {
2179                            let peer_id_str =
2180                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2181                            debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
2182
2183                            // Remove from active connections
2184                            active_connections.write().await.remove(&peer_id_str);
2185
2186                            // Update peer info status
2187                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2188                                peer_info.status = ConnectionStatus::Disconnected;
2189                                peer_info.last_seen = Instant::now();
2190                            }
2191
2192                            // Broadcast disconnection event
2193                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2194                        }
2195                        ConnectionEvent::Failed { peer_id, reason } => {
2196                            let peer_id_str =
2197                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2198                            warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
2199
2200                            // Remove from active connections
2201                            active_connections.write().await.remove(&peer_id_str);
2202
2203                            // Update peer info status
2204                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2205                                peer_info.status = ConnectionStatus::Failed(reason.clone());
2206                            }
2207
2208                            // Broadcast disconnection event
2209                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2210                        }
2211                    }
2212                }
2213                Err(broadcast::error::RecvError::Lagged(skipped)) => {
2214                    warn!(
2215                        "Connection event monitor lagged, skipped {} events",
2216                        skipped
2217                    );
2218                    continue;
2219                }
2220                Err(broadcast::error::RecvError::Closed) => {
2221                    info!("Connection event channel closed, stopping monitor");
2222                    break;
2223                }
2224            }
2225        }
2226
2227        info!("Connection lifecycle monitor stopped");
2228    }
2229
2230    /// Start connection monitor (called after node initialization)
2231    async fn start_connection_monitor(&self) {
2232        // The monitor task is already spawned in new() with a temporary peers map
2233        // This method is a placeholder for future enhancements where we might
2234        // need to restart the monitor or provide it with updated references
2235        debug!("Connection monitor already running from initialization");
2236    }
2237
2238    /// Keepalive task - sends periodic pings to prevent 30-second idle timeout
2239    ///
2240    /// ant-quic has a 30-second max_idle_timeout. This task sends a small keepalive
2241    /// message every 15 seconds (half the timeout) to all active connections to prevent
2242    /// them from timing out during periods of inactivity.
2243    async fn keepalive_task(
2244        active_connections: Arc<RwLock<HashSet<String>>>,
2245        dual_node: Arc<DualStackNetworkNode>,
2246        shutdown: Arc<AtomicBool>,
2247    ) {
2248        use tokio::time::{Duration, interval};
2249
2250        const KEEPALIVE_INTERVAL_SECS: u64 = 15; // Half of 30-second timeout
2251        const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; // Small payload
2252
2253        let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
2254        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2255
2256        info!(
2257            "Keepalive task started (interval: {}s)",
2258            KEEPALIVE_INTERVAL_SECS
2259        );
2260
2261        loop {
2262            // Check shutdown flag first
2263            if shutdown.load(Ordering::Relaxed) {
2264                info!("Keepalive task shutting down");
2265                break;
2266            }
2267
2268            interval.tick().await;
2269
2270            // Get snapshot of active connections
2271            let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
2272
2273            if peers.is_empty() {
2274                trace!("Keepalive: no active connections");
2275                continue;
2276            }
2277
2278            debug!("Sending keepalive to {} active connections", peers.len());
2279
2280            // Send keepalive to each peer
2281            for peer_id in peers {
2282                match dual_node
2283                    .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
2284                    .await
2285                {
2286                    Ok(_) => {
2287                        trace!("Keepalive sent to peer: {}", peer_id);
2288                    }
2289                    Err(e) => {
2290                        debug!(
2291                            "Failed to send keepalive to peer {}: {} (connection may have closed)",
2292                            peer_id, e
2293                        );
2294                        // Don't remove from active_connections here - let the lifecycle monitor handle it
2295                    }
2296                }
2297            }
2298        }
2299
2300        info!("Keepalive task stopped");
2301    }
2302
2303    /// Check system health
2304    pub async fn health_check(&self) -> Result<()> {
2305        if let Some(ref resource_manager) = self.resource_manager {
2306            resource_manager.health_check().await
2307        } else {
2308            // Basic health check without resource manager
2309            let peer_count = self.peer_count().await;
2310            if peer_count > self.config.max_connections {
2311                Err(P2PError::Network(
2312                    crate::error::NetworkError::ProtocolError(
2313                        format!("Too many connections: {peer_count}").into(),
2314                    ),
2315                ))
2316            } else {
2317                Ok(())
2318            }
2319        }
2320    }
2321
2322    /// Get production configuration (if enabled)
2323    pub fn production_config(&self) -> Option<&ProductionConfig> {
2324        self.config.production_config.as_ref()
2325    }
2326
2327    /// Check if production hardening is enabled
2328    pub fn is_production_mode(&self) -> bool {
2329        self.resource_manager.is_some()
2330    }
2331
2332    /// Get DHT reference
2333    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2334        self.dht.as_ref()
2335    }
2336
2337    /// Store a value in the DHT
2338    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2339        if let Some(ref dht) = self.dht {
2340            let mut dht_instance = dht.write().await;
2341            let dht_key = crate::dht::DhtKey::from_bytes(key);
2342            dht_instance
2343                .store(&dht_key, value.clone())
2344                .await
2345                .map_err(|e| {
2346                    P2PError::Dht(crate::error::DhtError::StoreFailed(
2347                        format!("{:?}: {e}", key).into(),
2348                    ))
2349                })?;
2350
2351            Ok(())
2352        } else {
2353            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2354                "DHT not enabled".to_string().into(),
2355            )))
2356        }
2357    }
2358
2359    /// Retrieve a value from the DHT
2360    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2361        if let Some(ref dht) = self.dht {
2362            let dht_instance = dht.read().await;
2363            let dht_key = crate::dht::DhtKey::from_bytes(key);
2364            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2365                P2PError::Dht(crate::error::DhtError::StoreFailed(
2366                    format!("Retrieve failed: {e}").into(),
2367                ))
2368            })?;
2369
2370            Ok(record_result)
2371        } else {
2372            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2373                "DHT not enabled".to_string().into(),
2374            )))
2375        }
2376    }
2377
2378    /// Add a discovered peer to the bootstrap cache
2379    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2380        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2381            let mut manager = bootstrap_manager.write().await;
2382            let socket_addresses: Vec<std::net::SocketAddr> = addresses
2383                .iter()
2384                .filter_map(|addr| addr.parse().ok())
2385                .collect();
2386            let contact = ContactEntry::new(peer_id, socket_addresses);
2387            manager.add_contact(contact).await.map_err(|e| {
2388                P2PError::Network(crate::error::NetworkError::ProtocolError(
2389                    format!("Failed to add peer to bootstrap cache: {e}").into(),
2390                ))
2391            })?;
2392        }
2393        Ok(())
2394    }
2395
2396    /// Update connection metrics for a peer in the bootstrap cache
2397    pub async fn update_peer_metrics(
2398        &self,
2399        peer_id: &PeerId,
2400        success: bool,
2401        latency_ms: Option<u64>,
2402        _error: Option<String>,
2403    ) -> Result<()> {
2404        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2405            let mut manager = bootstrap_manager.write().await;
2406
2407            // Create quality metrics based on the connection result
2408            let metrics = QualityMetrics {
2409                success_rate: if success { 1.0 } else { 0.0 },
2410                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2411                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
2412                last_connection_attempt: chrono::Utc::now(),
2413                last_successful_connection: if success {
2414                    chrono::Utc::now()
2415                } else {
2416                    chrono::Utc::now() - chrono::Duration::hours(1)
2417                },
2418                uptime_score: 0.5,
2419            };
2420
2421            manager
2422                .update_contact_metrics(peer_id, metrics)
2423                .await
2424                .map_err(|e| {
2425                    P2PError::Network(crate::error::NetworkError::ProtocolError(
2426                        format!("Failed to update peer metrics: {e}").into(),
2427                    ))
2428                })?;
2429        }
2430        Ok(())
2431    }
2432
2433    /// Get bootstrap cache statistics
2434    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2435        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2436            let manager = bootstrap_manager.read().await;
2437            let stats = manager.get_stats().await.map_err(|e| {
2438                P2PError::Network(crate::error::NetworkError::ProtocolError(
2439                    format!("Failed to get bootstrap stats: {e}").into(),
2440                ))
2441            })?;
2442            Ok(Some(stats))
2443        } else {
2444            Ok(None)
2445        }
2446    }
2447
2448    /// Get the number of cached bootstrap peers
2449    pub async fn cached_peer_count(&self) -> usize {
2450        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2451            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2452        {
2453            return stats.total_contacts;
2454        }
2455        0
2456    }
2457
2458    /// Connect to bootstrap peers
2459    async fn connect_bootstrap_peers(&self) -> Result<()> {
2460        let mut bootstrap_contacts = Vec::new();
2461        let mut used_cache = false;
2462        let mut seen_addresses = std::collections::HashSet::new();
2463
2464        // CLI-provided bootstrap peers take priority - always include them first
2465        let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2466            self.config.bootstrap_peers_str.clone()
2467        } else {
2468            // Convert Multiaddr to strings
2469            self.config
2470                .bootstrap_peers
2471                .iter()
2472                .map(|addr| addr.to_string())
2473                .collect::<Vec<_>>()
2474        };
2475
2476        if !cli_bootstrap_peers.is_empty() {
2477            info!(
2478                "Using {} CLI-provided bootstrap peers (priority)",
2479                cli_bootstrap_peers.len()
2480            );
2481            for addr in &cli_bootstrap_peers {
2482                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2483                    seen_addresses.insert(socket_addr);
2484                    let contact = ContactEntry::new(
2485                        format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2486                        vec![socket_addr],
2487                    );
2488                    bootstrap_contacts.push(contact);
2489                } else {
2490                    warn!("Invalid bootstrap address format: {}", addr);
2491                }
2492            }
2493        }
2494
2495        // Supplement with cached bootstrap peers (after CLI peers)
2496        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2497            let manager = bootstrap_manager.read().await;
2498            match manager.get_bootstrap_peers(20).await {
2499                // Try to get top 20 quality peers
2500                Ok(contacts) => {
2501                    if !contacts.is_empty() {
2502                        let mut added_from_cache = 0;
2503                        for contact in contacts {
2504                            // Only add if we haven't already added this address from CLI
2505                            let new_addresses: Vec<_> = contact
2506                                .addresses
2507                                .iter()
2508                                .filter(|addr| !seen_addresses.contains(addr))
2509                                .copied()
2510                                .collect();
2511
2512                            if !new_addresses.is_empty() {
2513                                for addr in &new_addresses {
2514                                    seen_addresses.insert(*addr);
2515                                }
2516                                let mut contact = contact.clone();
2517                                contact.addresses = new_addresses;
2518                                bootstrap_contacts.push(contact);
2519                                added_from_cache += 1;
2520                            }
2521                        }
2522                        if added_from_cache > 0 {
2523                            info!(
2524                                "Added {} cached bootstrap peers (supplementing CLI peers)",
2525                                added_from_cache
2526                            );
2527                            used_cache = true;
2528                        }
2529                    }
2530                }
2531                Err(e) => {
2532                    warn!("Failed to get cached bootstrap peers: {}", e);
2533                }
2534            }
2535        }
2536
2537        if bootstrap_contacts.is_empty() {
2538            info!("No bootstrap peers configured and no cached peers available");
2539            return Ok(());
2540        }
2541
2542        // Connect to bootstrap peers
2543        let mut successful_connections = 0;
2544        for contact in bootstrap_contacts {
2545            for addr in &contact.addresses {
2546                match self.connect_peer(&addr.to_string()).await {
2547                    Ok(peer_id) => {
2548                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2549                        successful_connections += 1;
2550
2551                        // Update bootstrap cache with successful connection
2552                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2553                            let mut manager = bootstrap_manager.write().await;
2554                            let mut updated_contact = contact.clone();
2555                            updated_contact.peer_id = peer_id.clone();
2556                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
2557
2558                            if let Err(e) = manager.add_contact(updated_contact).await {
2559                                warn!("Failed to update bootstrap cache: {}", e);
2560                            }
2561                        }
2562                        break; // Successfully connected, move to next contact
2563                    }
2564                    Err(e) => {
2565                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2566
2567                        // Update bootstrap cache with failed connection
2568                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2569                            let mut manager = bootstrap_manager.write().await;
2570                            let mut updated_contact = contact.clone();
2571                            updated_contact.update_connection_result(
2572                                false,
2573                                None,
2574                                Some(e.to_string()),
2575                            );
2576
2577                            if let Err(e) = manager.add_contact(updated_contact).await {
2578                                warn!("Failed to update bootstrap cache: {}", e);
2579                            }
2580                        }
2581                    }
2582                }
2583            }
2584        }
2585
2586        if successful_connections == 0 {
2587            if !used_cache {
2588                warn!("Failed to connect to any bootstrap peers");
2589            }
2590            // Starting a node should not be gated on immediate bootstrap connectivity.
2591            // Keep running and allow background discovery / retries to populate peers later.
2592            return Ok(());
2593        }
2594        info!(
2595            "Successfully connected to {} bootstrap peers",
2596            successful_connections
2597        );
2598
2599        Ok(())
2600    }
2601
2602    /// Disconnect from all peers
2603    async fn disconnect_all_peers(&self) -> Result<()> {
2604        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2605
2606        for peer_id in peer_ids {
2607            self.disconnect_peer(&peer_id).await?;
2608        }
2609
2610        Ok(())
2611    }
2612
2613    /// Perform periodic maintenance tasks
2614    async fn periodic_tasks(&self) -> Result<()> {
2615        // Update peer last seen timestamps
2616        // Remove stale connections
2617        // Perform DHT maintenance
2618        // This is a placeholder for now
2619
2620        Ok(())
2621    }
2622}
2623
2624/// Network sender trait for sending messages
2625#[async_trait::async_trait]
2626pub trait NetworkSender: Send + Sync {
2627    /// Send a message to a specific peer
2628    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2629
2630    /// Get our local peer ID
2631    fn local_peer_id(&self) -> &PeerId;
2632}
2633
2634/// Lightweight wrapper for P2PNode to implement NetworkSender
2635#[derive(Clone)]
2636pub struct P2PNetworkSender {
2637    peer_id: PeerId,
2638    // Use channels for async communication with the P2P node
2639    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2640}
2641
2642impl P2PNetworkSender {
2643    pub fn new(
2644        peer_id: PeerId,
2645        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2646    ) -> Self {
2647        Self { peer_id, send_tx }
2648    }
2649}
2650
2651/// Implementation of NetworkSender trait for P2PNetworkSender
2652#[async_trait::async_trait]
2653impl NetworkSender for P2PNetworkSender {
2654    /// Send a message to a specific peer via the P2P network
2655    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2656        self.send_tx
2657            .send((peer_id.clone(), protocol.to_string(), data))
2658            .map_err(|_| {
2659                P2PError::Network(crate::error::NetworkError::ProtocolError(
2660                    "Failed to send message via channel".to_string().into(),
2661                ))
2662            })?;
2663        Ok(())
2664    }
2665
2666    /// Get our local peer ID
2667    fn local_peer_id(&self) -> &PeerId {
2668        &self.peer_id
2669    }
2670}
2671
2672/// Builder pattern for creating P2P nodes
2673pub struct NodeBuilder {
2674    config: NodeConfig,
2675}
2676
2677impl Default for NodeBuilder {
2678    fn default() -> Self {
2679        Self::new()
2680    }
2681}
2682
2683impl NodeBuilder {
2684    /// Create a new node builder
2685    pub fn new() -> Self {
2686        Self {
2687            config: NodeConfig::default(),
2688        }
2689    }
2690
2691    /// Set the peer ID
2692    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2693        self.config.peer_id = Some(peer_id);
2694        self
2695    }
2696
2697    /// Add a listen address
2698    pub fn listen_on(mut self, addr: &str) -> Self {
2699        if let Ok(multiaddr) = addr.parse() {
2700            self.config.listen_addrs.push(multiaddr);
2701        }
2702        self
2703    }
2704
2705    /// Add a bootstrap peer
2706    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2707        if let Ok(multiaddr) = addr.parse() {
2708            self.config.bootstrap_peers.push(multiaddr);
2709        }
2710        self.config.bootstrap_peers_str.push(addr.to_string());
2711        self
2712    }
2713
2714    /// Enable IPv6 support
2715    pub fn with_ipv6(mut self, enable: bool) -> Self {
2716        self.config.enable_ipv6 = enable;
2717        self
2718    }
2719
2720    // MCP removed: builder methods deleted
2721
2722    /// Set connection timeout
2723    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2724        self.config.connection_timeout = timeout;
2725        self
2726    }
2727
2728    /// Set maximum connections
2729    pub fn with_max_connections(mut self, max: usize) -> Self {
2730        self.config.max_connections = max;
2731        self
2732    }
2733
2734    /// Enable production mode with default configuration
2735    pub fn with_production_mode(mut self) -> Self {
2736        self.config.production_config = Some(ProductionConfig::default());
2737        self
2738    }
2739
2740    /// Configure production settings
2741    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2742        self.config.production_config = Some(production_config);
2743        self
2744    }
2745
2746    /// Configure IP diversity limits for Sybil protection.
2747    pub fn with_diversity_config(
2748        mut self,
2749        diversity_config: crate::security::IPDiversityConfig,
2750    ) -> Self {
2751        self.config.diversity_config = Some(diversity_config);
2752        self
2753    }
2754
2755    /// Configure DHT settings
2756    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2757        self.config.dht_config = dht_config;
2758        self
2759    }
2760
2761    /// Enable DHT with default configuration
2762    pub fn with_default_dht(mut self) -> Self {
2763        self.config.dht_config = DHTConfig::default();
2764        self
2765    }
2766
2767    /// Build the P2P node
2768    pub async fn build(self) -> Result<P2PNode> {
2769        P2PNode::new(self.config).await
2770    }
2771}
2772
2773#[cfg(test)]
2774#[allow(clippy::unwrap_used, clippy::expect_used)]
2775mod diversity_tests {
2776    use super::*;
2777    use crate::security::IPDiversityConfig;
2778
2779    async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2780        let diversity_config = config.diversity_config.clone().unwrap_or_default();
2781        if let Some(ref cache_config) = config.bootstrap_cache_config {
2782            BootstrapManager::with_full_config(
2783                cache_config.clone(),
2784                crate::rate_limit::JoinRateLimiterConfig::default(),
2785                diversity_config,
2786            )
2787            .await
2788            .expect("bootstrap manager")
2789        } else {
2790            BootstrapManager::with_full_config(
2791                crate::bootstrap::CacheConfig::default(),
2792                crate::rate_limit::JoinRateLimiterConfig::default(),
2793                diversity_config,
2794            )
2795            .await
2796            .expect("bootstrap manager")
2797        }
2798    }
2799
2800    #[tokio::test]
2801    async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2802        let config = NodeConfig {
2803            diversity_config: Some(IPDiversityConfig::testnet()),
2804            ..Default::default()
2805        };
2806
2807        let manager = build_bootstrap_manager_like_prod(&config).await;
2808        assert!(manager.diversity_config().is_relaxed());
2809        assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
2810    }
2811}
2812
2813/// Standalone function to handle received messages without borrowing self
2814#[allow(dead_code)] // Deprecated during ant-quic migration
2815async fn handle_received_message_standalone(
2816    message_data: Vec<u8>,
2817    peer_id: &PeerId,
2818    _protocol: &str,
2819    event_tx: &broadcast::Sender<P2PEvent>,
2820) -> Result<()> {
2821    // Parse the message format
2822    match serde_json::from_slice::<serde_json::Value>(&message_data) {
2823        Ok(message) => {
2824            if let (Some(protocol), Some(data), Some(from)) = (
2825                message.get("protocol").and_then(|v| v.as_str()),
2826                message.get("data").and_then(|v| v.as_array()),
2827                message.get("from").and_then(|v| v.as_str()),
2828            ) {
2829                // Convert data array back to bytes
2830                let data_bytes: Vec<u8> = data
2831                    .iter()
2832                    .filter_map(|v| v.as_u64().map(|n| n as u8))
2833                    .collect();
2834
2835                // Generate message event
2836                let event = P2PEvent::Message {
2837                    topic: protocol.to_string(),
2838                    source: from.to_string(),
2839                    data: data_bytes,
2840                };
2841
2842                let _ = event_tx.send(event);
2843                debug!("Generated message event from peer: {}", peer_id);
2844            }
2845        }
2846        Err(e) => {
2847            warn!("Failed to parse received message from {}: {}", peer_id, e);
2848        }
2849    }
2850
2851    Ok(())
2852}
2853
2854// MCP removed: standalone MCP handler deleted
2855
2856/// Helper function to handle protocol message creation
2857#[allow(dead_code)]
2858fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2859    match create_protocol_message_static(protocol, data) {
2860        Ok(msg) => Some(msg),
2861        Err(e) => {
2862            warn!("Failed to create protocol message: {}", e);
2863            None
2864        }
2865    }
2866}
2867
2868/// Helper function to handle message send result
2869#[allow(dead_code)]
2870async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2871    match result {
2872        Ok(_) => {
2873            debug!("Message sent to peer {} via transport layer", peer_id);
2874        }
2875        Err(e) => {
2876            warn!("Failed to send message to peer {}: {}", peer_id, e);
2877        }
2878    }
2879}
2880
2881/// Helper function to check rate limit
2882#[allow(dead_code)] // Deprecated during ant-quic migration
2883fn check_rate_limit(
2884    rate_limiter: &RateLimiter,
2885    socket_addr: &std::net::SocketAddr,
2886    remote_addr: &NetworkAddress,
2887) -> Result<()> {
2888    rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2889        warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2890        e
2891    })
2892}
2893
2894/// Helper function to register a new peer
2895#[allow(dead_code)] // Deprecated during ant-quic migration
2896async fn register_new_peer(
2897    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2898    peer_id: &PeerId,
2899    remote_addr: &NetworkAddress,
2900) {
2901    let mut peers_guard = peers.write().await;
2902    let peer_info = PeerInfo {
2903        peer_id: peer_id.clone(),
2904        addresses: vec![remote_addr.to_string()],
2905        connected_at: tokio::time::Instant::now(),
2906        last_seen: tokio::time::Instant::now(),
2907        status: ConnectionStatus::Connected,
2908        protocols: vec!["p2p-chat/1.0.0".to_string()],
2909        heartbeat_count: 0,
2910    };
2911    peers_guard.insert(peer_id.clone(), peer_info);
2912}
2913
2914/// Helper function to spawn connection handler
2915#[allow(dead_code)] // Deprecated during ant-quic migration
2916fn spawn_connection_handler(
2917    connection: Box<dyn crate::transport::Connection>,
2918    peer_id: PeerId,
2919    event_tx: broadcast::Sender<P2PEvent>,
2920    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2921) {
2922    tokio::spawn(async move {
2923        handle_peer_connection(connection, peer_id, event_tx, peers).await;
2924    });
2925}
2926
2927/// Helper function to handle peer connection
2928#[allow(dead_code)] // Deprecated during ant-quic migration
2929async fn handle_peer_connection(
2930    mut connection: Box<dyn crate::transport::Connection>,
2931    peer_id: PeerId,
2932    event_tx: broadcast::Sender<P2PEvent>,
2933    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2934) {
2935    loop {
2936        match connection.receive().await {
2937            Ok(message_data) => {
2938                debug!(
2939                    "Received {} bytes from peer: {}",
2940                    message_data.len(),
2941                    peer_id
2942                );
2943
2944                // Handle the received message
2945                if let Err(e) = handle_received_message_standalone(
2946                    message_data,
2947                    &peer_id,
2948                    "unknown", // TODO: Extract protocol from message
2949                    &event_tx,
2950                )
2951                .await
2952                {
2953                    warn!("Failed to handle message from peer {}: {}", peer_id, e);
2954                }
2955            }
2956            Err(e) => {
2957                warn!("Failed to receive message from {}: {}", peer_id, e);
2958
2959                // Check if connection is still alive
2960                if !connection.is_alive().await {
2961                    info!("Connection to {} is dead, removing peer", peer_id);
2962
2963                    // Remove dead peer
2964                    remove_peer(&peers, &peer_id).await;
2965
2966                    // Generate peer disconnected event
2967                    let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2968
2969                    break; // Exit the message receiving loop
2970                }
2971
2972                // Brief pause before retrying
2973                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2974            }
2975        }
2976    }
2977}
2978
2979/// Helper function to remove a peer
2980#[allow(dead_code)] // Deprecated during ant-quic migration
2981async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2982    let mut peers_guard = peers.write().await;
2983    peers_guard.remove(peer_id);
2984}
2985
2986/// Helper function to update peer heartbeat
2987#[allow(dead_code)]
2988async fn update_peer_heartbeat(
2989    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2990    peer_id: &PeerId,
2991) -> Result<()> {
2992    let mut peers_guard = peers.write().await;
2993    match peers_guard.get_mut(peer_id) {
2994        Some(peer_info) => {
2995            peer_info.last_seen = Instant::now();
2996            peer_info.heartbeat_count += 1;
2997            Ok(())
2998        }
2999        None => {
3000            warn!("Received heartbeat from unknown peer: {}", peer_id);
3001            Err(P2PError::Network(NetworkError::PeerNotFound(
3002                format!("Peer {} not found", peer_id).into(),
3003            )))
3004        }
3005    }
3006}
3007
3008/// Helper function to get resource metrics
3009#[allow(dead_code)]
3010async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
3011    if let Some(manager) = resource_manager {
3012        let metrics = manager.get_metrics().await;
3013        (metrics.memory_used, metrics.cpu_usage)
3014    } else {
3015        (0, 0.0)
3016    }
3017}
3018
3019#[cfg(test)]
3020mod tests {
3021    use super::*;
3022    // MCP removed from tests
3023    use std::time::Duration;
3024    use tokio::time::timeout;
3025
3026    // Test tool handler for network tests
3027
3028    // MCP removed
3029
3030    /// Helper function to create a test node configuration
3031    fn create_test_node_config() -> NodeConfig {
3032        NodeConfig {
3033            peer_id: Some("test_peer_123".to_string()),
3034            listen_addrs: vec![
3035                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
3036                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
3037            ],
3038            listen_addr: std::net::SocketAddr::new(
3039                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3040                0,
3041            ),
3042            bootstrap_peers: vec![],
3043            bootstrap_peers_str: vec![],
3044            enable_ipv6: true,
3045
3046            connection_timeout: Duration::from_secs(2),
3047            keep_alive_interval: Duration::from_secs(30),
3048            max_connections: 100,
3049            max_incoming_connections: 50,
3050            dht_config: DHTConfig::default(),
3051            security_config: SecurityConfig::default(),
3052            production_config: None,
3053            bootstrap_cache_config: None,
3054            diversity_config: None,
3055            attestation_config: crate::attestation::AttestationConfig::default(),
3056        }
3057    }
3058
3059    /// Helper function to create a test tool
3060    // MCP removed: test tool helper deleted
3061
3062    #[tokio::test]
3063    async fn test_node_config_default() {
3064        let config = NodeConfig::default();
3065
3066        assert!(config.peer_id.is_none());
3067        assert_eq!(config.listen_addrs.len(), 2);
3068        assert!(config.enable_ipv6);
3069        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
3070        assert_eq!(config.max_incoming_connections, 100);
3071        assert_eq!(config.connection_timeout, Duration::from_secs(30));
3072    }
3073
3074    #[tokio::test]
3075    async fn test_dht_config_default() {
3076        let config = DHTConfig::default();
3077
3078        assert_eq!(config.k_value, 20);
3079        assert_eq!(config.alpha_value, 5);
3080        assert_eq!(config.record_ttl, Duration::from_secs(3600));
3081        assert_eq!(config.refresh_interval, Duration::from_secs(600));
3082    }
3083
3084    #[tokio::test]
3085    async fn test_security_config_default() {
3086        let config = SecurityConfig::default();
3087
3088        assert!(config.enable_noise);
3089        assert!(config.enable_tls);
3090        assert_eq!(config.trust_level, TrustLevel::Basic);
3091    }
3092
3093    #[test]
3094    fn test_trust_level_variants() {
3095        // Test that all trust level variants can be created
3096        let _none = TrustLevel::None;
3097        let _basic = TrustLevel::Basic;
3098        let _full = TrustLevel::Full;
3099
3100        // Test equality
3101        assert_eq!(TrustLevel::None, TrustLevel::None);
3102        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
3103        assert_eq!(TrustLevel::Full, TrustLevel::Full);
3104        assert_ne!(TrustLevel::None, TrustLevel::Basic);
3105    }
3106
3107    #[test]
3108    fn test_connection_status_variants() {
3109        let connecting = ConnectionStatus::Connecting;
3110        let connected = ConnectionStatus::Connected;
3111        let disconnecting = ConnectionStatus::Disconnecting;
3112        let disconnected = ConnectionStatus::Disconnected;
3113        let failed = ConnectionStatus::Failed("test error".to_string());
3114
3115        assert_eq!(connecting, ConnectionStatus::Connecting);
3116        assert_eq!(connected, ConnectionStatus::Connected);
3117        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
3118        assert_eq!(disconnected, ConnectionStatus::Disconnected);
3119        assert_ne!(connecting, connected);
3120
3121        if let ConnectionStatus::Failed(msg) = failed {
3122            assert_eq!(msg, "test error");
3123        } else {
3124            panic!("Expected Failed status");
3125        }
3126    }
3127
3128    #[tokio::test]
3129    async fn test_node_creation() -> Result<()> {
3130        let config = create_test_node_config();
3131        let node = P2PNode::new(config).await?;
3132
3133        assert_eq!(node.peer_id(), "test_peer_123");
3134        assert!(!node.is_running().await);
3135        assert_eq!(node.peer_count().await, 0);
3136        assert!(node.connected_peers().await.is_empty());
3137
3138        Ok(())
3139    }
3140
3141    #[tokio::test]
3142    async fn test_node_creation_without_peer_id() -> Result<()> {
3143        let mut config = create_test_node_config();
3144        config.peer_id = None;
3145
3146        let node = P2PNode::new(config).await?;
3147
3148        // Should have generated a peer ID
3149        assert!(node.peer_id().starts_with("peer_"));
3150        assert!(!node.is_running().await);
3151
3152        Ok(())
3153    }
3154
3155    #[tokio::test]
3156    async fn test_node_lifecycle() -> Result<()> {
3157        let config = create_test_node_config();
3158        let node = P2PNode::new(config).await?;
3159
3160        // Initially not running
3161        assert!(!node.is_running().await);
3162
3163        // Start the node
3164        node.start().await?;
3165        assert!(node.is_running().await);
3166
3167        // Check listen addresses were set (at least one)
3168        let listen_addrs = node.listen_addrs().await;
3169        assert!(
3170            !listen_addrs.is_empty(),
3171            "Expected at least one listening address"
3172        );
3173
3174        // Stop the node
3175        node.stop().await?;
3176        assert!(!node.is_running().await);
3177
3178        Ok(())
3179    }
3180
3181    #[tokio::test]
3182    async fn test_peer_connection() -> Result<()> {
3183        let config1 = create_test_node_config();
3184        let mut config2 = create_test_node_config();
3185        config2.peer_id = Some("test_peer_456".to_string());
3186
3187        let node1 = P2PNode::new(config1).await?;
3188        let node2 = P2PNode::new(config2).await?;
3189
3190        node1.start().await?;
3191        node2.start().await?;
3192
3193        let node2_addr = node2
3194            .listen_addrs()
3195            .await
3196            .into_iter()
3197            .find(|a| a.ip().is_ipv4())
3198            .ok_or_else(|| {
3199                P2PError::Network(crate::error::NetworkError::InvalidAddress(
3200                    "Node 2 did not expose an IPv4 listen address".into(),
3201                ))
3202            })?;
3203
3204        // Connect to a real peer
3205        let peer_id = node1.connect_peer(&node2_addr.to_string()).await?;
3206
3207        // Check peer count
3208        assert_eq!(node1.peer_count().await, 1);
3209
3210        // Check connected peers
3211        let connected_peers = node1.connected_peers().await;
3212        assert_eq!(connected_peers.len(), 1);
3213        assert_eq!(connected_peers[0], peer_id);
3214
3215        // Get peer info
3216        let peer_info = node1.peer_info(&peer_id).await;
3217        assert!(peer_info.is_some());
3218        let info = peer_info.expect("Peer info should exist after adding peer");
3219        assert_eq!(info.peer_id, peer_id);
3220        assert_eq!(info.status, ConnectionStatus::Connected);
3221        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3222
3223        // Disconnect from peer
3224        node1.disconnect_peer(&peer_id).await?;
3225        assert_eq!(node1.peer_count().await, 0);
3226
3227        node1.stop().await?;
3228        node2.stop().await?;
3229
3230        Ok(())
3231    }
3232
3233    // TODO(windows): Investigate QUIC connection issues on Windows CI
3234    // This test consistently fails on Windows GitHub Actions runners with
3235    // "All connect attempts failed" even with IPv4-only config, long delays,
3236    // and multiple retry attempts. The underlying ant-quic library may have
3237    // issues on Windows that need investigation.
3238    // See: https://github.com/dirvine/saorsa-core/issues/TBD
3239    #[cfg_attr(target_os = "windows", ignore)]
3240    #[tokio::test]
3241    async fn test_event_subscription() -> Result<()> {
3242        // Configure both nodes to use only IPv4 for reliable cross-platform testing
3243        // This is important because:
3244        // 1. local_addr() returns the first address from listen_addrs
3245        // 2. The default config puts IPv6 first, which may not work on all Windows setups
3246        let ipv4_localhost =
3247            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3248
3249        let mut config1 = create_test_node_config();
3250        config1.listen_addr = ipv4_localhost;
3251        config1.listen_addrs = vec![ipv4_localhost];
3252        config1.enable_ipv6 = false;
3253
3254        let mut config2 = create_test_node_config();
3255        config2.peer_id = Some("test_peer_456".to_string());
3256        config2.listen_addr = ipv4_localhost;
3257        config2.listen_addrs = vec![ipv4_localhost];
3258        config2.enable_ipv6 = false;
3259
3260        let node1 = P2PNode::new(config1).await?;
3261        let node2 = P2PNode::new(config2).await?;
3262
3263        node1.start().await?;
3264        node2.start().await?;
3265
3266        // Wait for nodes to fully bind their listening sockets
3267        // Windows network stack initialization can be significantly slower
3268        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
3269
3270        let mut events = node1.subscribe_events();
3271
3272        // Get the actual listening address using local_addr() for reliability
3273        let node2_addr = node2.local_addr().ok_or_else(|| {
3274            P2PError::Network(crate::error::NetworkError::ProtocolError(
3275                "No listening address".to_string().into(),
3276            ))
3277        })?;
3278
3279        // Connect to a peer with retry logic for Windows reliability
3280        // The QUIC library may need additional time to fully initialize
3281        let mut peer_id = None;
3282        for attempt in 0..3 {
3283            if attempt > 0 {
3284                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3285            }
3286            match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
3287                Ok(Ok(id)) => {
3288                    peer_id = Some(id);
3289                    break;
3290                }
3291                Ok(Err(_)) | Err(_) => continue,
3292            }
3293        }
3294        let peer_id = peer_id.ok_or_else(|| {
3295            P2PError::Network(crate::error::NetworkError::ProtocolError(
3296                "Failed to connect after 3 attempts".to_string().into(),
3297            ))
3298        })?;
3299
3300        // Check for PeerConnected event
3301        let event = timeout(Duration::from_secs(2), events.recv()).await;
3302        assert!(event.is_ok());
3303
3304        let event_result = event
3305            .expect("Should receive event")
3306            .expect("Event should not be error");
3307        match event_result {
3308            P2PEvent::PeerConnected(event_peer_id) => {
3309                assert_eq!(event_peer_id, peer_id);
3310            }
3311            _ => panic!("Expected PeerConnected event"),
3312        }
3313
3314        // Disconnect from peer (this should emit another event)
3315        node1.disconnect_peer(&peer_id).await?;
3316
3317        // Check for PeerDisconnected event
3318        let event = timeout(Duration::from_secs(2), events.recv()).await;
3319        assert!(event.is_ok());
3320
3321        let event_result = event
3322            .expect("Should receive event")
3323            .expect("Event should not be error");
3324        match event_result {
3325            P2PEvent::PeerDisconnected(event_peer_id) => {
3326                assert_eq!(event_peer_id, peer_id);
3327            }
3328            _ => panic!("Expected PeerDisconnected event"),
3329        }
3330
3331        node1.stop().await?;
3332        node2.stop().await?;
3333
3334        Ok(())
3335    }
3336
3337    // TODO(windows): Same QUIC connection issues as test_event_subscription
3338    #[cfg_attr(target_os = "windows", ignore)]
3339    #[tokio::test]
3340    async fn test_message_sending() -> Result<()> {
3341        // Create two nodes
3342        let mut config1 = create_test_node_config();
3343        config1.listen_addr =
3344            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3345        let node1 = P2PNode::new(config1).await?;
3346        node1.start().await?;
3347
3348        let mut config2 = create_test_node_config();
3349        config2.listen_addr =
3350            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3351        let node2 = P2PNode::new(config2).await?;
3352        node2.start().await?;
3353
3354        // Wait a bit for nodes to start listening
3355        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
3356
3357        // Get actual listening address of node2
3358        let node2_addr = node2.local_addr().ok_or_else(|| {
3359            P2PError::Network(crate::error::NetworkError::ProtocolError(
3360                "No listening address".to_string().into(),
3361            ))
3362        })?;
3363
3364        // Connect node1 to node2
3365        let peer_id =
3366            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
3367                Ok(res) => res?,
3368                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3369            };
3370
3371        // Wait a bit for connection to establish
3372        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
3373
3374        // Send a message
3375        let message_data = b"Hello, peer!".to_vec();
3376        let result = match timeout(
3377            Duration::from_millis(500),
3378            node1.send_message(&peer_id, "test-protocol", message_data),
3379        )
3380        .await
3381        {
3382            Ok(res) => res,
3383            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3384        };
3385        // For now, we'll just check that we don't get a "not connected" error
3386        // The actual send might fail due to no handler on the other side
3387        if let Err(e) = &result {
3388            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3389        }
3390
3391        // Try to send to non-existent peer
3392        let non_existent_peer = "non_existent_peer".to_string();
3393        let result = node1
3394            .send_message(&non_existent_peer, "test-protocol", vec![])
3395            .await;
3396        assert!(result.is_err(), "Sending to non-existent peer should fail");
3397
3398        Ok(())
3399    }
3400
3401    #[tokio::test]
3402    async fn test_remote_mcp_operations() -> Result<()> {
3403        let config = create_test_node_config();
3404        let node = P2PNode::new(config).await?;
3405
3406        // MCP removed; test reduced to simple start/stop
3407        node.start().await?;
3408        node.stop().await?;
3409        Ok(())
3410    }
3411
3412    #[tokio::test]
3413    async fn test_health_check() -> Result<()> {
3414        let config = create_test_node_config();
3415        let node = P2PNode::new(config).await?;
3416
3417        // Health check should pass with no connections
3418        let result = node.health_check().await;
3419        assert!(result.is_ok());
3420
3421        // Note: We're not actually connecting to real peers here
3422        // since that would require running bootstrap nodes.
3423        // The health check should still pass with no connections.
3424
3425        Ok(())
3426    }
3427
3428    #[tokio::test]
3429    async fn test_node_uptime() -> Result<()> {
3430        let config = create_test_node_config();
3431        let node = P2PNode::new(config).await?;
3432
3433        let uptime1 = node.uptime();
3434        assert!(uptime1 >= Duration::from_secs(0));
3435
3436        // Wait a bit
3437        tokio::time::sleep(Duration::from_millis(10)).await;
3438
3439        let uptime2 = node.uptime();
3440        assert!(uptime2 > uptime1);
3441
3442        Ok(())
3443    }
3444
3445    #[tokio::test]
3446    async fn test_node_config_access() -> Result<()> {
3447        let config = create_test_node_config();
3448        let expected_peer_id = config.peer_id.clone();
3449        let node = P2PNode::new(config).await?;
3450
3451        let node_config = node.config();
3452        assert_eq!(node_config.peer_id, expected_peer_id);
3453        assert_eq!(node_config.max_connections, 100);
3454        // MCP removed
3455
3456        Ok(())
3457    }
3458
3459    #[tokio::test]
3460    async fn test_mcp_server_access() -> Result<()> {
3461        let config = create_test_node_config();
3462        let _node = P2PNode::new(config).await?;
3463
3464        // MCP removed
3465        Ok(())
3466    }
3467
3468    #[tokio::test]
3469    async fn test_dht_access() -> Result<()> {
3470        let config = create_test_node_config();
3471        let node = P2PNode::new(config).await?;
3472
3473        // Should have DHT
3474        assert!(node.dht().is_some());
3475
3476        Ok(())
3477    }
3478
3479    #[tokio::test]
3480    async fn test_node_builder() -> Result<()> {
3481        // Create a config using the builder but don't actually build a real node
3482        let builder = P2PNode::builder()
3483            .with_peer_id("builder_test_peer".to_string())
3484            .listen_on("/ip4/127.0.0.1/tcp/0")
3485            .listen_on("/ip6/::1/tcp/0")
3486            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
3487            .with_ipv6(true)
3488            .with_connection_timeout(Duration::from_secs(15))
3489            .with_max_connections(200);
3490
3491        // Test the configuration that was built
3492        let config = builder.config;
3493        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3494        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
3495        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
3496        assert!(config.enable_ipv6);
3497        assert_eq!(config.connection_timeout, Duration::from_secs(15));
3498        assert_eq!(config.max_connections, 200);
3499
3500        Ok(())
3501    }
3502
3503    #[tokio::test]
3504    async fn test_bootstrap_peers() -> Result<()> {
3505        let mut config = create_test_node_config();
3506        config.bootstrap_peers = vec![
3507            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3508            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3509        ];
3510
3511        let node = P2PNode::new(config).await?;
3512
3513        // Start node (which attempts to connect to bootstrap peers)
3514        node.start().await?;
3515
3516        // In a test environment, bootstrap peers may not be available
3517        // The test verifies the node starts correctly with bootstrap configuration
3518        // Peer count may include local/internal tracking, so we just verify it's reasonable
3519        let _peer_count = node.peer_count().await;
3520
3521        node.stop().await?;
3522        Ok(())
3523    }
3524
3525    #[tokio::test]
3526    async fn test_production_mode_disabled() -> Result<()> {
3527        let config = create_test_node_config();
3528        let node = P2PNode::new(config).await?;
3529
3530        assert!(!node.is_production_mode());
3531        assert!(node.production_config().is_none());
3532
3533        // Resource metrics should fail when production mode is disabled
3534        let result = node.resource_metrics().await;
3535        assert!(result.is_err());
3536        assert!(result.unwrap_err().to_string().contains("not enabled"));
3537
3538        Ok(())
3539    }
3540
3541    #[tokio::test]
3542    async fn test_network_event_variants() {
3543        // Test that all network event variants can be created
3544        let peer_id = "test_peer".to_string();
3545        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3546
3547        let _peer_connected = NetworkEvent::PeerConnected {
3548            peer_id: peer_id.clone(),
3549            addresses: vec![address.clone()],
3550        };
3551
3552        let _peer_disconnected = NetworkEvent::PeerDisconnected {
3553            peer_id: peer_id.clone(),
3554            reason: "test disconnect".to_string(),
3555        };
3556
3557        let _message_received = NetworkEvent::MessageReceived {
3558            peer_id: peer_id.clone(),
3559            protocol: "test-protocol".to_string(),
3560            data: vec![1, 2, 3],
3561        };
3562
3563        let _connection_failed = NetworkEvent::ConnectionFailed {
3564            peer_id: Some(peer_id.clone()),
3565            address: address.clone(),
3566            error: "connection refused".to_string(),
3567        };
3568
3569        let _dht_stored = NetworkEvent::DHTRecordStored {
3570            key: vec![1, 2, 3],
3571            value: vec![4, 5, 6],
3572        };
3573
3574        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3575            key: vec![1, 2, 3],
3576            value: Some(vec![4, 5, 6]),
3577        };
3578    }
3579
3580    #[tokio::test]
3581    async fn test_peer_info_structure() {
3582        let peer_info = PeerInfo {
3583            peer_id: "test_peer".to_string(),
3584            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3585            connected_at: Instant::now(),
3586            last_seen: Instant::now(),
3587            status: ConnectionStatus::Connected,
3588            protocols: vec!["test-protocol".to_string()],
3589            heartbeat_count: 0,
3590        };
3591
3592        assert_eq!(peer_info.peer_id, "test_peer");
3593        assert_eq!(peer_info.addresses.len(), 1);
3594        assert_eq!(peer_info.status, ConnectionStatus::Connected);
3595        assert_eq!(peer_info.protocols.len(), 1);
3596    }
3597
3598    #[tokio::test]
3599    async fn test_serialization() -> Result<()> {
3600        // Test that configs can be serialized/deserialized
3601        let config = create_test_node_config();
3602        let serialized = serde_json::to_string(&config)?;
3603        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3604
3605        assert_eq!(config.peer_id, deserialized.peer_id);
3606        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3607        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3608
3609        Ok(())
3610    }
3611
3612    #[tokio::test]
3613    async fn test_get_peer_id_by_address_found() -> Result<()> {
3614        let config = create_test_node_config();
3615        let node = P2PNode::new(config).await?;
3616
3617        // Manually insert a peer for testing
3618        let test_peer_id = "peer_test_123".to_string();
3619        let test_address = "192.168.1.100:9000".to_string();
3620
3621        let peer_info = PeerInfo {
3622            peer_id: test_peer_id.clone(),
3623            addresses: vec![test_address.clone()],
3624            connected_at: Instant::now(),
3625            last_seen: Instant::now(),
3626            status: ConnectionStatus::Connected,
3627            protocols: vec!["test-protocol".to_string()],
3628            heartbeat_count: 0,
3629        };
3630
3631        node.peers
3632            .write()
3633            .await
3634            .insert(test_peer_id.clone(), peer_info);
3635
3636        // Test: Find peer by address
3637        let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3638        assert_eq!(found_peer_id, Some(test_peer_id));
3639
3640        Ok(())
3641    }
3642
3643    #[tokio::test]
3644    async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3645        let config = create_test_node_config();
3646        let node = P2PNode::new(config).await?;
3647
3648        // Test: Try to find a peer that doesn't exist
3649        let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3650        assert_eq!(result, None);
3651
3652        Ok(())
3653    }
3654
3655    #[tokio::test]
3656    async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3657        let config = create_test_node_config();
3658        let node = P2PNode::new(config).await?;
3659
3660        // Test: Invalid address format should return None
3661        let result = node.get_peer_id_by_address("invalid-address").await;
3662        assert_eq!(result, None);
3663
3664        Ok(())
3665    }
3666
3667    #[tokio::test]
3668    async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3669        let config = create_test_node_config();
3670        let node = P2PNode::new(config).await?;
3671
3672        // Add multiple peers with different addresses
3673        let peer1_id = "peer_1".to_string();
3674        let peer1_addr = "192.168.1.101:9001".to_string();
3675
3676        let peer2_id = "peer_2".to_string();
3677        let peer2_addr = "192.168.1.102:9002".to_string();
3678
3679        let peer1_info = PeerInfo {
3680            peer_id: peer1_id.clone(),
3681            addresses: vec![peer1_addr.clone()],
3682            connected_at: Instant::now(),
3683            last_seen: Instant::now(),
3684            status: ConnectionStatus::Connected,
3685            protocols: vec!["test-protocol".to_string()],
3686            heartbeat_count: 0,
3687        };
3688
3689        let peer2_info = PeerInfo {
3690            peer_id: peer2_id.clone(),
3691            addresses: vec![peer2_addr.clone()],
3692            connected_at: Instant::now(),
3693            last_seen: Instant::now(),
3694            status: ConnectionStatus::Connected,
3695            protocols: vec!["test-protocol".to_string()],
3696            heartbeat_count: 0,
3697        };
3698
3699        node.peers
3700            .write()
3701            .await
3702            .insert(peer1_id.clone(), peer1_info);
3703        node.peers
3704            .write()
3705            .await
3706            .insert(peer2_id.clone(), peer2_info);
3707
3708        // Test: Find each peer by their unique address
3709        let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3710        let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3711
3712        assert_eq!(found_peer1, Some(peer1_id));
3713        assert_eq!(found_peer2, Some(peer2_id));
3714
3715        Ok(())
3716    }
3717
3718    #[tokio::test]
3719    async fn test_list_active_connections_empty() -> Result<()> {
3720        let config = create_test_node_config();
3721        let node = P2PNode::new(config).await?;
3722
3723        // Test: No connections initially
3724        let connections = node.list_active_connections().await;
3725        assert!(connections.is_empty());
3726
3727        Ok(())
3728    }
3729
3730    #[tokio::test]
3731    async fn test_list_active_connections_with_peers() -> Result<()> {
3732        let config = create_test_node_config();
3733        let node = P2PNode::new(config).await?;
3734
3735        // Add multiple peers
3736        let peer1_id = "peer_1".to_string();
3737        let peer1_addrs = vec![
3738            "192.168.1.101:9001".to_string(),
3739            "192.168.1.101:9002".to_string(),
3740        ];
3741
3742        let peer2_id = "peer_2".to_string();
3743        let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3744
3745        let peer1_info = PeerInfo {
3746            peer_id: peer1_id.clone(),
3747            addresses: peer1_addrs.clone(),
3748            connected_at: Instant::now(),
3749            last_seen: Instant::now(),
3750            status: ConnectionStatus::Connected,
3751            protocols: vec!["test-protocol".to_string()],
3752            heartbeat_count: 0,
3753        };
3754
3755        let peer2_info = PeerInfo {
3756            peer_id: peer2_id.clone(),
3757            addresses: peer2_addrs.clone(),
3758            connected_at: Instant::now(),
3759            last_seen: Instant::now(),
3760            status: ConnectionStatus::Connected,
3761            protocols: vec!["test-protocol".to_string()],
3762            heartbeat_count: 0,
3763        };
3764
3765        node.peers
3766            .write()
3767            .await
3768            .insert(peer1_id.clone(), peer1_info);
3769        node.peers
3770            .write()
3771            .await
3772            .insert(peer2_id.clone(), peer2_info);
3773
3774        // Also add to active_connections (list_active_connections iterates over this)
3775        node.active_connections
3776            .write()
3777            .await
3778            .insert(peer1_id.clone());
3779        node.active_connections
3780            .write()
3781            .await
3782            .insert(peer2_id.clone());
3783
3784        // Test: List all active connections
3785        let connections = node.list_active_connections().await;
3786        assert_eq!(connections.len(), 2);
3787
3788        // Verify peer1 and peer2 are in the list
3789        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3790        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3791
3792        assert!(peer1_conn.is_some());
3793        assert!(peer2_conn.is_some());
3794
3795        // Verify addresses match
3796        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3797        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3798
3799        Ok(())
3800    }
3801
3802    #[tokio::test]
3803    async fn test_remove_peer_success() -> Result<()> {
3804        let config = create_test_node_config();
3805        let node = P2PNode::new(config).await?;
3806
3807        // Add a peer
3808        let peer_id = "peer_to_remove".to_string();
3809        let peer_info = PeerInfo {
3810            peer_id: peer_id.clone(),
3811            addresses: vec!["192.168.1.100:9000".to_string()],
3812            connected_at: Instant::now(),
3813            last_seen: Instant::now(),
3814            status: ConnectionStatus::Connected,
3815            protocols: vec!["test-protocol".to_string()],
3816            heartbeat_count: 0,
3817        };
3818
3819        node.peers.write().await.insert(peer_id.clone(), peer_info);
3820
3821        // Verify peer exists
3822        assert!(node.is_peer_connected(&peer_id).await);
3823
3824        // Remove the peer
3825        let removed = node.remove_peer(&peer_id).await;
3826        assert!(removed);
3827
3828        // Verify peer no longer exists
3829        assert!(!node.is_peer_connected(&peer_id).await);
3830
3831        Ok(())
3832    }
3833
3834    #[tokio::test]
3835    async fn test_remove_peer_nonexistent() -> Result<()> {
3836        let config = create_test_node_config();
3837        let node = P2PNode::new(config).await?;
3838
3839        // Try to remove a peer that doesn't exist
3840        let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3841        assert!(!removed);
3842
3843        Ok(())
3844    }
3845
3846    #[tokio::test]
3847    async fn test_is_peer_connected() -> Result<()> {
3848        let config = create_test_node_config();
3849        let node = P2PNode::new(config).await?;
3850
3851        let peer_id = "test_peer".to_string();
3852
3853        // Initially not connected
3854        assert!(!node.is_peer_connected(&peer_id).await);
3855
3856        // Add peer
3857        let peer_info = PeerInfo {
3858            peer_id: peer_id.clone(),
3859            addresses: vec!["192.168.1.100:9000".to_string()],
3860            connected_at: Instant::now(),
3861            last_seen: Instant::now(),
3862            status: ConnectionStatus::Connected,
3863            protocols: vec!["test-protocol".to_string()],
3864            heartbeat_count: 0,
3865        };
3866
3867        node.peers.write().await.insert(peer_id.clone(), peer_info);
3868
3869        // Now connected
3870        assert!(node.is_peer_connected(&peer_id).await);
3871
3872        // Remove peer
3873        node.remove_peer(&peer_id).await;
3874
3875        // No longer connected
3876        assert!(!node.is_peer_connected(&peer_id).await);
3877
3878        Ok(())
3879    }
3880
3881    #[test]
3882    fn test_normalize_ipv6_wildcard() {
3883        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3884
3885        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3886        let normalized = normalize_wildcard_to_loopback(wildcard);
3887
3888        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3889        assert_eq!(normalized.port(), 8080);
3890    }
3891
3892    #[test]
3893    fn test_normalize_ipv4_wildcard() {
3894        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3895
3896        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3897        let normalized = normalize_wildcard_to_loopback(wildcard);
3898
3899        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3900        assert_eq!(normalized.port(), 9000);
3901    }
3902
3903    #[test]
3904    fn test_normalize_specific_address_unchanged() {
3905        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3906        let normalized = normalize_wildcard_to_loopback(specific);
3907
3908        assert_eq!(normalized, specific);
3909    }
3910
3911    #[test]
3912    fn test_normalize_loopback_unchanged() {
3913        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3914
3915        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3916        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3917        assert_eq!(normalized_v6, loopback_v6);
3918
3919        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3920        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3921        assert_eq!(normalized_v4, loopback_v4);
3922    }
3923}