saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] // Temporarily unused during migration
31use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use std::collections::{HashMap, HashSet};
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40use tokio::sync::{RwLock, broadcast};
41use tokio::time::Instant;
42use tracing::{debug, error, info, trace, warn};
43
44/// Configuration for a P2P node
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeConfig {
47    /// Local peer ID for this node
48    pub peer_id: Option<PeerId>,
49
50    /// Addresses to listen on for incoming connections
51    pub listen_addrs: Vec<std::net::SocketAddr>,
52
53    /// Primary listen address (for compatibility)
54    pub listen_addr: std::net::SocketAddr,
55
56    /// Bootstrap peers to connect to on startup (legacy)
57    pub bootstrap_peers: Vec<std::net::SocketAddr>,
58
59    /// Bootstrap peers as strings (for integration tests)
60    pub bootstrap_peers_str: Vec<String>,
61
62    /// Enable IPv6 support
63    pub enable_ipv6: bool,
64
65    // MCP removed; will be redesigned later
66    /// Connection timeout duration
67    pub connection_timeout: Duration,
68
69    /// Keep-alive interval for connections
70    pub keep_alive_interval: Duration,
71
72    /// Maximum number of concurrent connections
73    pub max_connections: usize,
74
75    /// Maximum number of incoming connections
76    pub max_incoming_connections: usize,
77
78    /// DHT configuration
79    pub dht_config: DHTConfig,
80
81    /// Security configuration
82    pub security_config: SecurityConfig,
83
84    /// Production hardening configuration
85    pub production_config: Option<ProductionConfig>,
86
87    /// Bootstrap cache configuration
88    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
89
90    /// Optional IP diversity configuration for Sybil protection tuning.
91    ///
92    /// When set, this configuration is used by bootstrap peer discovery and
93    /// other diversity-enforcing subsystems. If `None`, defaults are used.
94    pub diversity_config: Option<crate::security::IPDiversityConfig>,
95
96    /// Attestation configuration for software integrity verification.
97    ///
98    /// Controls how nodes verify each other's software attestation during handshake.
99    /// In Phase 1, this is used for "soft enforcement" (logging only).
100    #[serde(default)]
101    pub attestation_config: crate::attestation::AttestationConfig,
102}
103
104/// DHT-specific configuration
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct DHTConfig {
107    /// Kademlia K parameter (bucket size)
108    pub k_value: usize,
109
110    /// Kademlia alpha parameter (parallelism)
111    pub alpha_value: usize,
112
113    /// DHT record TTL
114    pub record_ttl: Duration,
115
116    /// DHT refresh interval
117    pub refresh_interval: Duration,
118}
119
120/// Security configuration
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct SecurityConfig {
123    /// Enable noise protocol for encryption
124    pub enable_noise: bool,
125
126    /// Enable TLS for secure transport
127    pub enable_tls: bool,
128
129    /// Trust level for peer verification
130    pub trust_level: TrustLevel,
131}
132
133/// Trust level for peer verification
134#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
135pub enum TrustLevel {
136    /// No verification required
137    None,
138    /// Basic peer ID verification
139    Basic,
140    /// Full cryptographic verification
141    Full,
142}
143
144// ============================================================================
145// Address Construction Helpers
146// ============================================================================
147
148/// Build listen addresses based on port and IPv6 preference
149///
150/// This helper consolidates the duplicated address construction logic.
151#[inline]
152fn build_listen_addrs(port: u16, ipv6_enabled: bool) -> Vec<std::net::SocketAddr> {
153    let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
154
155    if ipv6_enabled {
156        addrs.push(std::net::SocketAddr::new(
157            std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
158            port,
159        ));
160    }
161
162    addrs.push(std::net::SocketAddr::new(
163        std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
164        port,
165    ));
166
167    addrs
168}
169
170impl NodeConfig {
171    /// Create a new NodeConfig with default values
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if default addresses cannot be parsed
176    pub fn new() -> Result<Self> {
177        let config = Config::default();
178        let listen_addr = config.listen_socket_addr()?;
179
180        Ok(Self {
181            peer_id: None,
182            listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
183            listen_addr,
184            bootstrap_peers: Vec::new(),
185            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
186            enable_ipv6: config.network.ipv6_enabled,
187            connection_timeout: Duration::from_secs(config.network.connection_timeout),
188            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
189            max_connections: config.network.max_connections,
190            max_incoming_connections: config.security.connection_limit as usize,
191            dht_config: DHTConfig::default(),
192            security_config: SecurityConfig::default(),
193            production_config: None,
194            bootstrap_cache_config: None,
195            diversity_config: None,
196            attestation_config: config.attestation.clone(),
197        })
198    }
199
200    /// Create a builder for customized NodeConfig construction
201    pub fn builder() -> NodeConfigBuilder {
202        NodeConfigBuilder::default()
203    }
204}
205
206// ============================================================================
207// NodeConfig Builder Pattern
208// ============================================================================
209
210/// Builder for constructing NodeConfig with fluent API
211#[derive(Debug, Clone, Default)]
212pub struct NodeConfigBuilder {
213    peer_id: Option<PeerId>,
214    listen_port: Option<u16>,
215    enable_ipv6: Option<bool>,
216    bootstrap_peers: Vec<std::net::SocketAddr>,
217    max_connections: Option<usize>,
218    connection_timeout: Option<Duration>,
219    keep_alive_interval: Option<Duration>,
220    dht_config: Option<DHTConfig>,
221    security_config: Option<SecurityConfig>,
222    production_config: Option<ProductionConfig>,
223}
224
225impl NodeConfigBuilder {
226    /// Set the peer ID
227    pub fn peer_id(mut self, peer_id: PeerId) -> Self {
228        self.peer_id = Some(peer_id);
229        self
230    }
231
232    /// Set the listen port
233    pub fn listen_port(mut self, port: u16) -> Self {
234        self.listen_port = Some(port);
235        self
236    }
237
238    /// Enable or disable IPv6
239    pub fn ipv6(mut self, enabled: bool) -> Self {
240        self.enable_ipv6 = Some(enabled);
241        self
242    }
243
244    /// Add a bootstrap peer
245    pub fn bootstrap_peer(mut self, addr: std::net::SocketAddr) -> Self {
246        self.bootstrap_peers.push(addr);
247        self
248    }
249
250    /// Set maximum connections
251    pub fn max_connections(mut self, max: usize) -> Self {
252        self.max_connections = Some(max);
253        self
254    }
255
256    /// Set connection timeout
257    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
258        self.connection_timeout = Some(timeout);
259        self
260    }
261
262    /// Set keep-alive interval
263    pub fn keep_alive_interval(mut self, interval: Duration) -> Self {
264        self.keep_alive_interval = Some(interval);
265        self
266    }
267
268    /// Set DHT configuration
269    pub fn dht_config(mut self, config: DHTConfig) -> Self {
270        self.dht_config = Some(config);
271        self
272    }
273
274    /// Set security configuration
275    pub fn security_config(mut self, config: SecurityConfig) -> Self {
276        self.security_config = Some(config);
277        self
278    }
279
280    /// Set production configuration
281    pub fn production_config(mut self, config: ProductionConfig) -> Self {
282        self.production_config = Some(config);
283        self
284    }
285
286    /// Build the NodeConfig
287    ///
288    /// # Errors
289    ///
290    /// Returns an error if address construction fails
291    pub fn build(self) -> Result<NodeConfig> {
292        let base_config = Config::default();
293        let default_port = base_config
294            .listen_socket_addr()
295            .map(|addr| addr.port())
296            .unwrap_or(9000);
297        let port = self.listen_port.unwrap_or(default_port);
298        let ipv6_enabled = self.enable_ipv6.unwrap_or(base_config.network.ipv6_enabled);
299
300        let listen_addr =
301            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), port);
302
303        Ok(NodeConfig {
304            peer_id: self.peer_id,
305            listen_addrs: build_listen_addrs(port, ipv6_enabled),
306            listen_addr,
307            bootstrap_peers: self.bootstrap_peers.clone(),
308            bootstrap_peers_str: self.bootstrap_peers.iter().map(|a| a.to_string()).collect(),
309            enable_ipv6: ipv6_enabled,
310            connection_timeout: self
311                .connection_timeout
312                .unwrap_or(Duration::from_secs(base_config.network.connection_timeout)),
313            keep_alive_interval: self
314                .keep_alive_interval
315                .unwrap_or(Duration::from_secs(base_config.network.keepalive_interval)),
316            max_connections: self
317                .max_connections
318                .unwrap_or(base_config.network.max_connections),
319            max_incoming_connections: base_config.security.connection_limit as usize,
320            dht_config: self.dht_config.unwrap_or_default(),
321            security_config: self.security_config.unwrap_or_default(),
322            production_config: self.production_config,
323            bootstrap_cache_config: None,
324            diversity_config: None,
325            attestation_config: base_config.attestation.clone(),
326        })
327    }
328}
329
330impl Default for NodeConfig {
331    fn default() -> Self {
332        let config = Config::default();
333        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
334            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 9000)
335        });
336
337        Self {
338            peer_id: None,
339            listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
340            listen_addr,
341            bootstrap_peers: Vec::new(),
342            bootstrap_peers_str: Vec::new(),
343            enable_ipv6: config.network.ipv6_enabled,
344            connection_timeout: Duration::from_secs(config.network.connection_timeout),
345            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
346            max_connections: config.network.max_connections,
347            max_incoming_connections: config.security.connection_limit as usize,
348            dht_config: DHTConfig::default(),
349            security_config: SecurityConfig::default(),
350            production_config: None,
351            bootstrap_cache_config: None,
352            diversity_config: None,
353            attestation_config: config.attestation.clone(),
354        }
355    }
356}
357
358impl NodeConfig {
359    /// Create NodeConfig from Config
360    pub fn from_config(config: &Config) -> Result<Self> {
361        let listen_addr = config.listen_socket_addr()?;
362        let bootstrap_addrs = config.bootstrap_addrs()?;
363
364        let mut node_config = Self {
365            peer_id: None,
366            listen_addrs: vec![listen_addr],
367            listen_addr,
368            bootstrap_peers: bootstrap_addrs
369                .iter()
370                .map(|addr| addr.socket_addr())
371                .collect(),
372            bootstrap_peers_str: config
373                .network
374                .bootstrap_nodes
375                .iter()
376                .map(|addr| addr.to_string())
377                .collect(),
378            enable_ipv6: config.network.ipv6_enabled,
379
380            connection_timeout: Duration::from_secs(config.network.connection_timeout),
381            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
382            max_connections: config.network.max_connections,
383            max_incoming_connections: config.security.connection_limit as usize,
384            dht_config: DHTConfig {
385                k_value: 20,
386                alpha_value: 3,
387                record_ttl: Duration::from_secs(3600),
388                refresh_interval: Duration::from_secs(900),
389            },
390            security_config: SecurityConfig {
391                enable_noise: true,
392                enable_tls: true,
393                trust_level: TrustLevel::Basic,
394            },
395            production_config: Some(ProductionConfig {
396                max_connections: config.network.max_connections,
397                max_memory_bytes: 0,  // unlimited
398                max_bandwidth_bps: 0, // unlimited
399                connection_timeout: Duration::from_secs(config.network.connection_timeout),
400                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
401                health_check_interval: Duration::from_secs(30),
402                metrics_interval: Duration::from_secs(60),
403                enable_performance_tracking: true,
404                enable_auto_cleanup: true,
405                shutdown_timeout: Duration::from_secs(30),
406                rate_limits: crate::production::RateLimitConfig::default(),
407            }),
408            bootstrap_cache_config: None,
409            diversity_config: None,
410            attestation_config: config.attestation.clone(),
411        };
412
413        // Add IPv6 listen address if enabled
414        if config.network.ipv6_enabled {
415            node_config.listen_addrs.push(std::net::SocketAddr::new(
416                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
417                listen_addr.port(),
418            ));
419        }
420
421        Ok(node_config)
422    }
423
424    /// Try to build a NodeConfig from a listen address string
425    pub fn with_listen_addr(addr: &str) -> Result<Self> {
426        let listen_addr: std::net::SocketAddr = addr
427            .parse()
428            .map_err(|e: std::net::AddrParseError| {
429                NetworkError::InvalidAddress(e.to_string().into())
430            })
431            .map_err(P2PError::Network)?;
432        let cfg = NodeConfig {
433            listen_addr,
434            listen_addrs: vec![listen_addr],
435            diversity_config: None,
436            ..Default::default()
437        };
438        Ok(cfg)
439    }
440}
441
442impl Default for DHTConfig {
443    fn default() -> Self {
444        Self {
445            k_value: 20,
446            alpha_value: 5,
447            record_ttl: Duration::from_secs(3600), // 1 hour
448            refresh_interval: Duration::from_secs(600), // 10 minutes
449        }
450    }
451}
452
453impl Default for SecurityConfig {
454    fn default() -> Self {
455        Self {
456            enable_noise: true,
457            enable_tls: true,
458            trust_level: TrustLevel::Basic,
459        }
460    }
461}
462
463/// Information about a connected peer
464#[derive(Debug, Clone)]
465pub struct PeerInfo {
466    /// Peer identifier
467    pub peer_id: PeerId,
468
469    /// Peer's addresses
470    pub addresses: Vec<String>,
471
472    /// Connection timestamp
473    pub connected_at: Instant,
474
475    /// Last seen timestamp
476    pub last_seen: Instant,
477
478    /// Connection status
479    pub status: ConnectionStatus,
480
481    /// Supported protocols
482    pub protocols: Vec<String>,
483
484    /// Number of heartbeats received
485    pub heartbeat_count: u64,
486}
487
488/// Connection status for a peer
489#[derive(Debug, Clone, PartialEq)]
490pub enum ConnectionStatus {
491    /// Connection is being established
492    Connecting,
493    /// Connection is established and active
494    Connected,
495    /// Connection is being closed
496    Disconnecting,
497    /// Connection is closed
498    Disconnected,
499    /// Connection failed
500    Failed(String),
501}
502
503/// Network events that can occur
504#[derive(Debug, Clone)]
505pub enum NetworkEvent {
506    /// A new peer has connected
507    PeerConnected {
508        /// The identifier of the newly connected peer
509        peer_id: PeerId,
510        /// The network addresses where the peer can be reached
511        addresses: Vec<String>,
512    },
513
514    /// A peer has disconnected
515    PeerDisconnected {
516        /// The identifier of the disconnected peer
517        peer_id: PeerId,
518        /// The reason for the disconnection
519        reason: String,
520    },
521
522    /// A message was received from a peer
523    MessageReceived {
524        /// The identifier of the sending peer
525        peer_id: PeerId,
526        /// The protocol used for the message
527        protocol: String,
528        /// The raw message data
529        data: Vec<u8>,
530    },
531
532    /// A connection attempt failed
533    ConnectionFailed {
534        /// The identifier of the peer (if known)
535        peer_id: Option<PeerId>,
536        /// The address where connection was attempted
537        address: String,
538        /// The error message describing the failure
539        error: String,
540    },
541
542    /// DHT record was stored
543    DHTRecordStored {
544        /// The DHT key where the record was stored
545        key: Vec<u8>,
546        /// The value that was stored
547        value: Vec<u8>,
548    },
549
550    /// DHT record was retrieved
551    DHTRecordRetrieved {
552        /// The DHT key that was queried
553        key: Vec<u8>,
554        /// The retrieved value, if found
555        value: Option<Vec<u8>>,
556    },
557}
558
559/// Network events that can occur in the P2P system
560///
561/// Events are broadcast to all listeners and provide real-time
562/// notifications of network state changes and message arrivals.
563#[derive(Debug, Clone)]
564pub enum P2PEvent {
565    /// Message received from a peer on a specific topic
566    Message {
567        /// Topic or channel the message was sent on
568        topic: String,
569        /// Peer ID of the message sender
570        source: PeerId,
571        /// Raw message data payload
572        data: Vec<u8>,
573    },
574    /// A new peer has connected to the network
575    PeerConnected(PeerId),
576    /// A peer has disconnected from the network
577    PeerDisconnected(PeerId),
578}
579
580/// Main P2P node structure
581/// Main P2P network node that manages connections, routing, and communication
582///
583/// This struct represents a complete P2P network participant that can:
584/// - Connect to other peers via QUIC transport
585/// - Participate in distributed hash table (DHT) operations
586/// - Send and receive messages through various protocols
587/// - Handle network events and peer lifecycle
588/// - Provide MCP (Model Context Protocol) services
589pub struct P2PNode {
590    /// Node configuration
591    config: NodeConfig,
592
593    /// Our peer ID
594    peer_id: PeerId,
595
596    /// Connected peers
597    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
598
599    /// Network event broadcaster
600    event_tx: broadcast::Sender<P2PEvent>,
601
602    /// Listen addresses
603    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
604
605    /// Node start time
606    start_time: Instant,
607
608    /// Running state
609    running: RwLock<bool>,
610
611    /// DHT instance (optional)
612    dht: Option<Arc<RwLock<DHT>>>,
613
614    /// Production resource manager (optional)
615    resource_manager: Option<Arc<ResourceManager>>,
616
617    /// Bootstrap cache manager for peer discovery
618    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
619
620    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
621    dual_node: Arc<DualStackNetworkNode>,
622
623    /// Rate limiter for connection and request throttling
624    #[allow(dead_code)]
625    rate_limiter: Arc<RateLimiter>,
626
627    /// Active connections (tracked by peer_id)
628    /// This set is synchronized with ant-quic's connection state via event monitoring
629    active_connections: Arc<RwLock<HashSet<PeerId>>>,
630
631    /// Security dashboard for monitoring
632    pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
633
634    /// Connection lifecycle monitor task handle
635    #[allow(dead_code)]
636    connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
637
638    /// Keepalive task handle
639    #[allow(dead_code)]
640    keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
641
642    /// Shutdown flag for background tasks
643    #[allow(dead_code)]
644    shutdown: Arc<AtomicBool>,
645
646    /// GeoIP provider for connection validation
647    #[allow(dead_code)]
648    geo_provider: Arc<BgpGeoProvider>,
649
650    /// This node's entangled identity (derived from public key + binary hash + nonce).
651    /// Used for software attestation verification during peer handshake.
652    entangled_id: Option<crate::attestation::EntangledId>,
653
654    /// BLAKE3 hash of the running binary for attestation.
655    /// In production, this is computed at startup from the executable.
656    binary_hash: [u8; 32],
657}
658
659/// Normalize wildcard bind addresses to localhost loopback addresses
660///
661/// ant-quic correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
662/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
663///
664/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
665/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
666/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
667/// - All other addresses pass through unchanged
668///
669/// # Arguments
670/// * `addr` - The SocketAddr to normalize
671///
672/// # Returns
673/// * Normalized SocketAddr suitable for remote connections
674fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
675    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
676
677    if addr.ip().is_unspecified() {
678        // Convert unspecified addresses to loopback
679        let loopback_ip = match addr {
680            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
681            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
682        };
683        std::net::SocketAddr::new(loopback_ip, addr.port())
684    } else {
685        // Not a wildcard address, pass through unchanged
686        addr
687    }
688}
689
690impl P2PNode {
691    /// Minimal constructor for tests that avoids real networking
692    pub fn new_for_tests() -> Result<Self> {
693        let (event_tx, _) = broadcast::channel(16);
694        Ok(Self {
695            config: NodeConfig::default(),
696            peer_id: "test_peer".to_string(),
697            peers: Arc::new(RwLock::new(HashMap::new())),
698            event_tx,
699            listen_addrs: RwLock::new(Vec::new()),
700            start_time: Instant::now(),
701            running: RwLock::new(false),
702            dht: None,
703            resource_manager: None,
704            bootstrap_manager: None,
705            dual_node: {
706                // Bind dual-stack nodes on ephemeral ports for tests
707                let v6: Option<std::net::SocketAddr> = "[::1]:0"
708                    .parse()
709                    .ok()
710                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
711                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
712                let handle = tokio::runtime::Handle::current();
713                let dual_attempt = handle.block_on(
714                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
715                );
716                let dual = match dual_attempt {
717                    Ok(d) => d,
718                    Err(_e1) => {
719                        // Fallback to IPv4-only ephemeral bind
720                        let fallback = handle.block_on(
721                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
722                                None,
723                                "127.0.0.1:0".parse().ok(),
724                            ),
725                        );
726                        match fallback {
727                            Ok(d) => d,
728                            Err(e2) => {
729                                return Err(P2PError::Network(NetworkError::BindError(
730                                    format!("Failed to create dual-stack network node: {}", e2)
731                                        .into(),
732                                )));
733                            }
734                        }
735                    }
736                };
737                Arc::new(dual)
738            },
739            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
740                max_requests: 100,
741                burst_size: 100,
742                window: std::time::Duration::from_secs(1),
743                ..Default::default()
744            })),
745            active_connections: Arc::new(RwLock::new(HashSet::new())),
746            connection_monitor_handle: Arc::new(RwLock::new(None)),
747            keepalive_handle: Arc::new(RwLock::new(None)),
748            shutdown: Arc::new(AtomicBool::new(false)),
749            geo_provider: Arc::new(BgpGeoProvider::new()),
750            security_dashboard: None,
751            // Attestation fields - use dummy values for tests
752            entangled_id: None,
753            binary_hash: [0u8; 32],
754        })
755    }
756    /// Create a new P2P node with the given configuration
757    pub async fn new(config: NodeConfig) -> Result<Self> {
758        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
759            // Generate a random peer ID for now
760            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
761        });
762
763        let (event_tx, _) = broadcast::channel(1000);
764
765        // Initialize and register a TrustWeightedKademlia DHT for the global API
766        // Use a deterministic local NodeId derived from the peer_id
767        {
768            use blake3::Hasher;
769            let mut hasher = Hasher::new();
770            hasher.update(peer_id.as_bytes());
771            let digest = hasher.finalize();
772            let mut nid = [0u8; 32];
773            nid.copy_from_slice(digest.as_bytes());
774            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
775                crate::identity::node_identity::NodeId::from_bytes(nid),
776            ));
777            // TODO: Update to use new clean API
778            // let _ = crate::api::set_dht_instance(twdht);
779        }
780
781        // Initialize DHT if needed
782        let (dht, security_dashboard) = if true {
783            // Assuming DHT is always enabled for now, or check config
784            let _dht_config = crate::dht::DHTConfig {
785                replication_factor: config.dht_config.k_value,
786                bucket_size: config.dht_config.k_value,
787                alpha: config.dht_config.alpha_value,
788                record_ttl: config.dht_config.record_ttl,
789                bucket_refresh_interval: config.dht_config.refresh_interval,
790                republish_interval: config.dht_config.refresh_interval,
791                max_distance: 160,
792            };
793            // Convert peer_id String to NodeId
794            let peer_bytes = peer_id.as_bytes();
795            let mut node_id_bytes = [0u8; 32];
796            let len = peer_bytes.len().min(32);
797            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
798            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
799            let dht_instance = DHT::new(node_id).map_err(|e| {
800                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
801                    e.to_string().into(),
802                ))
803            })?;
804            dht_instance.start_maintenance_tasks();
805
806            // Create Security Dashboard
807            let security_metrics = dht_instance.security_metrics();
808            let dashboard = crate::dht::metrics::SecurityDashboard::new(
809                security_metrics,
810                Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
811                Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
812                Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
813            );
814
815            (
816                Some(Arc::new(RwLock::new(dht_instance))),
817                Some(Arc::new(dashboard)),
818            )
819        } else {
820            (None, None)
821        };
822
823        // MCP removed
824
825        // Initialize production resource manager if configured
826        let resource_manager = config
827            .production_config
828            .clone()
829            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
830
831        // Initialize bootstrap cache manager
832        let diversity_config = config.diversity_config.clone().unwrap_or_default();
833        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
834            match BootstrapManager::with_full_config(
835                cache_config.clone(),
836                crate::rate_limit::JoinRateLimiterConfig::default(),
837                diversity_config.clone(),
838            )
839            .await
840            {
841                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
842                Err(e) => {
843                    warn!(
844                        "Failed to initialize bootstrap manager: {}, continuing without cache",
845                        e
846                    );
847                    None
848                }
849            }
850        } else {
851            match BootstrapManager::with_full_config(
852                crate::bootstrap::CacheConfig::default(),
853                crate::rate_limit::JoinRateLimiterConfig::default(),
854                diversity_config,
855            )
856            .await
857            {
858                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
859                Err(e) => {
860                    warn!(
861                        "Failed to initialize bootstrap manager: {}, continuing without cache",
862                        e
863                    );
864                    None
865                }
866            }
867        };
868
869        // Initialize dual-stack ant-quic nodes
870        // Determine bind addresses
871        let (v6_opt, v4_opt) = {
872            let port = config.listen_addr.port();
873            let ip = config.listen_addr.ip();
874
875            let v4_addr = if ip.is_ipv4() {
876                Some(std::net::SocketAddr::new(ip, port))
877            } else {
878                // If config is IPv6, we still might want IPv4 on UNSPECIFIED if dual stack is desired
879                // But for now let's just stick to defaults if not specified
880                Some(std::net::SocketAddr::new(
881                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
882                    port,
883                ))
884            };
885
886            let v6_addr = if config.enable_ipv6 {
887                if ip.is_ipv6() {
888                    Some(std::net::SocketAddr::new(ip, port))
889                } else {
890                    Some(std::net::SocketAddr::new(
891                        std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
892                        port,
893                    ))
894                }
895            } else {
896                None
897            };
898            (v6_addr, v4_addr)
899        };
900
901        let dual_node = Arc::new(
902            DualStackNetworkNode::new(v6_opt, v4_opt)
903                .await
904                .map_err(|e| {
905                    P2PError::Transport(crate::error::TransportError::SetupFailed(
906                        format!("Failed to create dual-stack network nodes: {}", e).into(),
907                    ))
908                })?,
909        );
910
911        // Initialize rate limiter with default config
912        let rate_limiter = Arc::new(RateLimiter::new(
913            crate::validation::RateLimitConfig::default(),
914        ));
915
916        // Create active connections tracker
917        let active_connections = Arc::new(RwLock::new(HashSet::new()));
918
919        // Initialize GeoIP provider
920        let geo_provider = Arc::new(BgpGeoProvider::new());
921
922        // Create peers map
923        let peers = Arc::new(RwLock::new(HashMap::new()));
924
925        // Start connection lifecycle monitor
926        // CRITICAL: Subscribe to connection events BEFORE spawning the task
927        // to avoid race condition where early connections are missed
928        let connection_event_rx = dual_node.subscribe_connection_events();
929
930        let connection_monitor_handle = {
931            let active_conns = Arc::clone(&active_connections);
932            let peers_map = Arc::clone(&peers);
933            let event_tx_clone = event_tx.clone();
934            let dual_node_clone = Arc::clone(&dual_node);
935            let geo_provider_clone = Arc::clone(&geo_provider);
936            let peer_id_clone = peer_id.clone();
937
938            let handle = tokio::spawn(async move {
939                Self::connection_lifecycle_monitor_with_rx(
940                    dual_node_clone,
941                    connection_event_rx,
942                    active_conns,
943                    peers_map,
944                    event_tx_clone,
945                    geo_provider_clone,
946                    peer_id_clone,
947                )
948                .await;
949            });
950
951            Arc::new(RwLock::new(Some(handle)))
952        };
953
954        // Spawn keepalive task
955        let shutdown = Arc::new(AtomicBool::new(false));
956        let keepalive_handle = {
957            let active_conns = Arc::clone(&active_connections);
958            let dual_node_clone = Arc::clone(&dual_node);
959            let shutdown_clone = Arc::clone(&shutdown);
960
961            let handle = tokio::spawn(async move {
962                Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
963            });
964
965            Arc::new(RwLock::new(Some(handle)))
966        };
967
968        // Compute binary hash for attestation (in production, this would be the actual binary)
969        // For now, we use a placeholder that will be replaced during node initialization
970        let binary_hash = Self::compute_binary_hash();
971
972        let node = Self {
973            config,
974            peer_id,
975            peers,
976            event_tx,
977            listen_addrs: RwLock::new(Vec::new()),
978            start_time: Instant::now(),
979            running: RwLock::new(false),
980            dht,
981            resource_manager,
982            bootstrap_manager,
983            dual_node,
984            rate_limiter,
985            active_connections,
986            security_dashboard,
987            connection_monitor_handle,
988            keepalive_handle,
989            shutdown,
990            geo_provider,
991            // Attestation - EntangledId will be derived later when NodeIdentity is available
992            entangled_id: None,
993            binary_hash,
994        };
995        info!("Created P2P node with peer ID: {}", node.peer_id);
996
997        // Start the network listeners to populate listen addresses
998        node.start_network_listeners().await?;
999
1000        // Update the connection monitor with actual peers reference
1001        node.start_connection_monitor().await;
1002
1003        Ok(node)
1004    }
1005
1006    /// Create a new node builder
1007    pub fn builder() -> NodeBuilder {
1008        NodeBuilder::new()
1009    }
1010
1011    /// Get the peer ID of this node
1012    pub fn peer_id(&self) -> &PeerId {
1013        &self.peer_id
1014    }
1015
1016    pub fn local_addr(&self) -> Option<String> {
1017        self.listen_addrs
1018            .try_read()
1019            .ok()
1020            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
1021    }
1022
1023    pub async fn subscribe(&self, topic: &str) -> Result<()> {
1024        // In a real implementation, this would register the topic with the pubsub mechanism.
1025        // For now, we just log it.
1026        info!("Subscribed to topic: {}", topic);
1027        Ok(())
1028    }
1029
1030    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1031        info!(
1032            "Publishing message to topic: {} ({} bytes)",
1033            topic,
1034            data.len()
1035        );
1036
1037        // Get list of connected peers
1038        let peer_list: Vec<PeerId> = {
1039            let peers_guard = self.peers.read().await;
1040            peers_guard.keys().cloned().collect()
1041        };
1042
1043        if peer_list.is_empty() {
1044            debug!("No peers connected, message will only be sent to local subscribers");
1045        } else {
1046            // Send message to all connected peers
1047            let mut send_count = 0;
1048            for peer_id in &peer_list {
1049                match self.send_message(peer_id, topic, data.to_vec()).await {
1050                    Ok(_) => {
1051                        send_count += 1;
1052                        debug!("Sent message to peer: {}", peer_id);
1053                    }
1054                    Err(e) => {
1055                        warn!("Failed to send message to peer {}: {}", peer_id, e);
1056                    }
1057                }
1058            }
1059            info!(
1060                "Published message to {}/{} connected peers",
1061                send_count,
1062                peer_list.len()
1063            );
1064        }
1065
1066        // Also send to local subscribers (for local echo and testing)
1067        let event = P2PEvent::Message {
1068            topic: topic.to_string(),
1069            source: self.peer_id.clone(),
1070            data: data.to_vec(),
1071        };
1072        let _ = self.event_tx.send(event);
1073
1074        Ok(())
1075    }
1076
1077    /// Get the node configuration
1078    pub fn config(&self) -> &NodeConfig {
1079        &self.config
1080    }
1081
1082    /// Start the P2P node
1083    pub async fn start(&self) -> Result<()> {
1084        info!("Starting P2P node...");
1085
1086        // Start production resource manager if configured
1087        if let Some(ref resource_manager) = self.resource_manager {
1088            resource_manager.start().await.map_err(|e| {
1089                P2PError::Network(crate::error::NetworkError::ProtocolError(
1090                    format!("Failed to start resource manager: {e}").into(),
1091                ))
1092            })?;
1093            info!("Production resource manager started");
1094        }
1095
1096        // Start bootstrap manager background tasks
1097        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1098            let mut manager = bootstrap_manager.write().await;
1099            manager.start_background_tasks().await.map_err(|e| {
1100                P2PError::Network(crate::error::NetworkError::ProtocolError(
1101                    format!("Failed to start bootstrap manager: {e}").into(),
1102                ))
1103            })?;
1104            info!("Bootstrap cache manager started");
1105        }
1106
1107        // Set running state
1108        *self.running.write().await = true;
1109
1110        // Start listening on configured addresses using transport layer
1111        self.start_network_listeners().await?;
1112
1113        // Log current listen addresses
1114        let listen_addrs = self.listen_addrs.read().await;
1115        info!("P2P node started on addresses: {:?}", *listen_addrs);
1116
1117        // MCP removed
1118
1119        // Start message receiving system
1120        self.start_message_receiving_system().await?;
1121
1122        // Connect to bootstrap peers
1123        self.connect_bootstrap_peers().await?;
1124
1125        Ok(())
1126    }
1127
1128    /// Start network listeners on configured addresses
1129    async fn start_network_listeners(&self) -> Result<()> {
1130        info!("Starting dual-stack listeners (ant-quic)...");
1131        // Update our listen_addrs from the dual node bindings
1132        let addrs = self.dual_node.local_addrs().await.map_err(|e| {
1133            P2PError::Transport(crate::error::TransportError::SetupFailed(
1134                format!("Failed to get local addresses: {}", e).into(),
1135            ))
1136        })?;
1137        {
1138            let mut la = self.listen_addrs.write().await;
1139            *la = addrs.clone();
1140        }
1141
1142        // Spawn a background accept loop that handles incoming connections from either stack
1143        let event_tx = self.event_tx.clone();
1144        let peers = self.peers.clone();
1145        let active_connections = self.active_connections.clone();
1146        let rate_limiter = self.rate_limiter.clone();
1147        let dual = self.dual_node.clone();
1148        tokio::spawn(async move {
1149            loop {
1150                match dual.accept_any().await {
1151                    Ok((ant_peer_id, remote_sock)) => {
1152                        let peer_id =
1153                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
1154                        let remote_addr = NetworkAddress::from(remote_sock);
1155                        // Optional: basic IP rate limiting
1156                        let _ = rate_limiter.check_ip(&remote_sock.ip());
1157                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1158                        register_new_peer(&peers, &peer_id, &remote_addr).await;
1159                        active_connections.write().await.insert(peer_id);
1160                    }
1161                    Err(e) => {
1162                        warn!("Accept failed: {}", e);
1163                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1164                    }
1165                }
1166            }
1167        });
1168
1169        info!("Dual-stack listeners active on: {:?}", addrs);
1170        Ok(())
1171    }
1172
1173    /// Start a listener on a specific socket address
1174    #[allow(dead_code)]
1175    async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1176        // use crate::transport::{Transport}; // Unused during migration
1177
1178        // DISABLED during ant-quic migration - TODO: Reimplement using AntQuicAdapter
1179        /*
1180        // Try QUIC first (preferred transport)
1181        match crate::transport::QuicTransport::new(Default::default()) {
1182            Ok(quic_transport) => {
1183                match quic_transport.listen(NetworkAddress::new(addr)).await {
1184                    Ok(listen_addrs) => {
1185                        info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
1186
1187                        // Store the actual listening addresses in the node
1188                        {
1189                            let mut node_listen_addrs = self.listen_addrs.write().await;
1190                            // Don't clear - accumulate addresses from multiple listeners
1191                            node_listen_addrs.push(listen_addrs.socket_addr());
1192                        }
1193
1194                        // Start accepting connections in background
1195                        self.start_connection_acceptor(
1196                            Arc::new(quic_transport),
1197                            addr,
1198                            crate::transport::TransportType::QUIC
1199                        ).await?;
1200
1201                        return Ok(());
1202                    }
1203                    Err(e) => {
1204                        warn!("Failed to start QUIC listener on {}: {}", addr, e);
1205                    }
1206                }
1207            }
1208            Err(e) => {
1209                warn!("Failed to create QUIC transport for listening: {}", e);
1210            }
1211        }
1212        */
1213
1214        warn!("QUIC transport temporarily disabled during ant-quic migration");
1215        // No TCP fallback - QUIC only
1216        Err(crate::P2PError::Transport(
1217            crate::error::TransportError::SetupFailed(
1218                format!(
1219                    "Failed to start QUIC listener on {addr} - transport disabled during migration"
1220                )
1221                .into(),
1222            ),
1223        ))
1224    }
1225
1226    /// Start connection acceptor background task
1227    #[allow(dead_code)] // Deprecated during ant-quic migration
1228    async fn start_connection_acceptor(
1229        &self,
1230        transport: Arc<dyn crate::transport::Transport>,
1231        addr: std::net::SocketAddr,
1232        transport_type: crate::transport::TransportType,
1233    ) -> Result<()> {
1234        info!(
1235            "Starting connection acceptor for {:?} on {}",
1236            transport_type, addr
1237        );
1238
1239        // Clone necessary data for the background task
1240        let event_tx = self.event_tx.clone();
1241        let _peer_id = self.peer_id.clone();
1242        let peers = Arc::clone(&self.peers);
1243        // ant-quic dual-stack node is managed separately; accept loop started in start_network_listeners
1244
1245        let rate_limiter = Arc::clone(&self.rate_limiter);
1246
1247        // Spawn background task to accept incoming connections
1248        tokio::spawn(async move {
1249            loop {
1250                match transport.accept().await {
1251                    Ok(connection) => {
1252                        let remote_addr = connection.remote_addr();
1253                        let connection_peer_id =
1254                            format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1255
1256                        // Apply rate limiting for incoming connections
1257                        let socket_addr = remote_addr.socket_addr();
1258                        if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1259                            // Connection dropped automatically when it goes out of scope
1260                            continue;
1261                        }
1262
1263                        info!(
1264                            "Accepted {:?} connection from {} (peer: {})",
1265                            transport_type, remote_addr, connection_peer_id
1266                        );
1267
1268                        // Generate peer connected event
1269                        let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1270
1271                        // Store the peer connection
1272                        register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1273
1274                        // Spawn task to handle this specific connection's messages
1275                        spawn_connection_handler(
1276                            connection,
1277                            connection_peer_id,
1278                            event_tx.clone(),
1279                            Arc::clone(&peers),
1280                        );
1281                    }
1282                    Err(e) => {
1283                        warn!(
1284                            "Failed to accept {:?} connection on {}: {}",
1285                            transport_type, addr, e
1286                        );
1287
1288                        // Brief pause before retrying to avoid busy loop
1289                        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1290                    }
1291                }
1292            }
1293        });
1294
1295        info!(
1296            "Connection acceptor background task started for {:?} on {}",
1297            transport_type, addr
1298        );
1299        Ok(())
1300    }
1301
1302    /// Start the message receiving system with background tasks
1303    async fn start_message_receiving_system(&self) -> Result<()> {
1304        info!("Starting message receiving system");
1305        let dual = self.dual_node.clone();
1306        let event_tx = self.event_tx.clone();
1307
1308        tokio::spawn(async move {
1309            loop {
1310                match dual.receive_any().await {
1311                    Ok((_peer_id, bytes)) => {
1312                        // Expect the JSON message wrapper from create_protocol_message
1313                        #[allow(clippy::collapsible_if)]
1314                        if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1315                            if let (Some(protocol), Some(data), Some(from)) = (
1316                                value.get("protocol").and_then(|v| v.as_str()),
1317                                value.get("data").and_then(|v| v.as_array()),
1318                                value.get("from").and_then(|v| v.as_str()),
1319                            ) {
1320                                let payload: Vec<u8> = data
1321                                    .iter()
1322                                    .filter_map(|v| v.as_u64().map(|n| n as u8))
1323                                    .collect();
1324                                let _ = event_tx.send(P2PEvent::Message {
1325                                    topic: protocol.to_string(),
1326                                    source: from.to_string(),
1327                                    data: payload,
1328                                });
1329                            }
1330                        }
1331                    }
1332                    Err(e) => {
1333                        warn!("Receive error: {}", e);
1334                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1335                    }
1336                }
1337            }
1338        });
1339
1340        Ok(())
1341    }
1342
1343    /// Handle a received message and generate appropriate events
1344    #[allow(dead_code)]
1345    async fn handle_received_message(
1346        &self,
1347        message_data: Vec<u8>,
1348        peer_id: &PeerId,
1349        _protocol: &str,
1350        event_tx: &broadcast::Sender<P2PEvent>,
1351    ) -> Result<()> {
1352        // MCP removed: no special protocol handling
1353
1354        // Parse the message format we created in create_protocol_message
1355        match serde_json::from_slice::<serde_json::Value>(&message_data) {
1356            Ok(message) => {
1357                if let (Some(protocol), Some(data), Some(from)) = (
1358                    message.get("protocol").and_then(|v| v.as_str()),
1359                    message.get("data").and_then(|v| v.as_array()),
1360                    message.get("from").and_then(|v| v.as_str()),
1361                ) {
1362                    // Convert data array back to bytes
1363                    let data_bytes: Vec<u8> = data
1364                        .iter()
1365                        .filter_map(|v| v.as_u64().map(|n| n as u8))
1366                        .collect();
1367
1368                    // Generate message event
1369                    let event = P2PEvent::Message {
1370                        topic: protocol.to_string(),
1371                        source: from.to_string(),
1372                        data: data_bytes,
1373                    };
1374
1375                    let _ = event_tx.send(event);
1376                    debug!("Generated message event from peer: {}", peer_id);
1377                }
1378            }
1379            Err(e) => {
1380                warn!("Failed to parse received message from {}: {}", peer_id, e);
1381            }
1382        }
1383
1384        Ok(())
1385    }
1386
1387    // MCP removed
1388
1389    // MCP removed
1390
1391    /// Run the P2P node (blocks until shutdown)
1392    pub async fn run(&self) -> Result<()> {
1393        if !*self.running.read().await {
1394            self.start().await?;
1395        }
1396
1397        info!("P2P node running...");
1398
1399        // Main event loop
1400        loop {
1401            if !*self.running.read().await {
1402                break;
1403            }
1404
1405            // Perform periodic tasks
1406            self.periodic_tasks().await?;
1407
1408            // Sleep for a short interval
1409            tokio::time::sleep(Duration::from_millis(100)).await;
1410        }
1411
1412        info!("P2P node stopped");
1413        Ok(())
1414    }
1415
1416    /// Stop the P2P node
1417    pub async fn stop(&self) -> Result<()> {
1418        info!("Stopping P2P node...");
1419
1420        // Set running state to false
1421        *self.running.write().await = false;
1422
1423        // Disconnect all peers
1424        self.disconnect_all_peers().await?;
1425
1426        // Shutdown production resource manager if configured
1427        if let Some(ref resource_manager) = self.resource_manager {
1428            resource_manager.shutdown().await.map_err(|e| {
1429                P2PError::Network(crate::error::NetworkError::ProtocolError(
1430                    format!("Failed to shutdown resource manager: {e}").into(),
1431                ))
1432            })?;
1433            info!("Production resource manager stopped");
1434        }
1435
1436        info!("P2P node stopped");
1437        Ok(())
1438    }
1439
1440    /// Graceful shutdown alias for tests
1441    pub async fn shutdown(&self) -> Result<()> {
1442        self.stop().await
1443    }
1444
1445    /// Check if the node is running
1446    pub async fn is_running(&self) -> bool {
1447        *self.running.read().await
1448    }
1449
1450    /// Get the current listen addresses
1451    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1452        self.listen_addrs.read().await.clone()
1453    }
1454
1455    /// Get connected peers
1456    pub async fn connected_peers(&self) -> Vec<PeerId> {
1457        // "Connected" is defined as currently active at the transport layer.
1458        // The peers map may contain historical peers with Disconnected/Failed status.
1459        self.active_connections
1460            .read()
1461            .await
1462            .iter()
1463            .cloned()
1464            .collect()
1465    }
1466
1467    /// Get peer count
1468    pub async fn peer_count(&self) -> usize {
1469        self.active_connections.read().await.len()
1470    }
1471
1472    /// Get peer info
1473    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1474        self.peers.read().await.get(peer_id).cloned()
1475    }
1476
1477    /// Get the peer ID for a given socket address, if connected
1478    ///
1479    /// This method searches through all connected peers to find one that has
1480    /// the specified address in its address list.
1481    ///
1482    /// # Arguments
1483    /// * `addr` - The socket address to search for (e.g., "192.168.1.100:9000")
1484    ///
1485    /// # Returns
1486    /// * `Some(PeerId)` - The peer ID if a matching connected peer is found
1487    /// * `None` - If no peer with this address is currently connected
1488    pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1489        // Parse the address to a SocketAddr for comparison
1490        let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1491
1492        let peers = self.peers.read().await;
1493
1494        // Search through all connected peers
1495        for (peer_id, peer_info) in peers.iter() {
1496            // Check if this peer has a matching address
1497            for peer_addr in &peer_info.addresses {
1498                if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1499                    && peer_socket == socket_addr
1500                {
1501                    return Some(peer_id.clone());
1502                }
1503            }
1504        }
1505
1506        None
1507    }
1508
1509    /// List all active connections with their peer IDs and addresses
1510    ///
1511    /// # Returns
1512    /// A vector of tuples containing (PeerId, Vec<String>) where the Vec<String>
1513    /// contains all known addresses for that peer.
1514    pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1515        let active = self.active_connections.read().await;
1516        let peers = self.peers.read().await;
1517
1518        active
1519            .iter()
1520            .map(|peer_id| {
1521                let addresses = peers
1522                    .get(peer_id)
1523                    .map(|info| info.addresses.clone())
1524                    .unwrap_or_default();
1525                (peer_id.clone(), addresses)
1526            })
1527            .collect()
1528    }
1529
1530    /// Remove a peer from the peers map
1531    ///
1532    /// This method removes a peer from the internal peers map. It should be used
1533    /// when a connection is no longer valid (e.g., after detecting that the underlying
1534    /// ant-quic connection has closed).
1535    ///
1536    /// # Arguments
1537    /// * `peer_id` - The ID of the peer to remove
1538    ///
1539    /// # Returns
1540    /// `true` if the peer was found and removed, `false` if the peer was not in the map
1541    pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1542        // Remove from active connections tracking
1543        self.active_connections.write().await.remove(peer_id);
1544        // Remove from peers map and return whether it existed
1545        self.peers.write().await.remove(peer_id).is_some()
1546    }
1547
1548    /// Check if a peer is connected
1549    ///
1550    /// This method checks if the peer ID exists in the peers map. Note that this
1551    /// only verifies the peer is registered - it does not guarantee the underlying
1552    /// ant-quic connection is still active. For connection validation, use `send_message`
1553    /// which will fail if the connection is closed.
1554    ///
1555    /// # Arguments
1556    /// * `peer_id` - The ID of the peer to check
1557    ///
1558    /// # Returns
1559    /// `true` if the peer exists in the peers map, `false` otherwise
1560    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1561        self.peers.read().await.contains_key(peer_id)
1562    }
1563
1564    /// Connect to a peer
1565    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1566        info!("Connecting to peer at: {}", address);
1567
1568        // Check production limits if resource manager is enabled
1569        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1570            Some(resource_manager.acquire_connection().await?)
1571        } else {
1572            None
1573        };
1574
1575        // Parse the address to SocketAddr format
1576        let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1577            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1578                format!("{}: {}", address, e).into(),
1579            ))
1580        })?;
1581
1582        // Normalize wildcard addresses to loopback for local connections
1583        // This converts [::]:port → ::1:port and 0.0.0.0:port → 127.0.0.1:port
1584        let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1585        if normalized_addr != socket_addr {
1586            info!(
1587                "Normalized wildcard address {} to loopback {}",
1588                socket_addr, normalized_addr
1589            );
1590        }
1591
1592        // Establish a real connection via dual-stack Happy Eyeballs, but cap the wait
1593        let addr_list = vec![normalized_addr];
1594        let peer_id = match tokio::time::timeout(
1595            self.config.connection_timeout,
1596            self.dual_node.connect_happy_eyeballs(&addr_list),
1597        )
1598        .await
1599        {
1600            Ok(Ok(peer)) => {
1601                let connected_peer_id =
1602                    crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1603                info!("Successfully connected to peer: {}", connected_peer_id);
1604
1605                // Prevent self-connections by checking if remote peer_id matches our own
1606                if connected_peer_id == self.peer_id {
1607                    warn!(
1608                        "Detected self-connection to own address {} (peer_id: {}), rejecting",
1609                        address, connected_peer_id
1610                    );
1611                    // Don't add this connection to our peer list - the underlying QUIC connection
1612                    // will eventually timeout, but we won't track it as a valid peer
1613                    return Err(P2PError::Network(
1614                        crate::error::NetworkError::InvalidAddress(
1615                            format!("Cannot connect to self ({})", address).into(),
1616                        ),
1617                    ));
1618                }
1619
1620                connected_peer_id
1621            }
1622            Ok(Err(e)) => {
1623                warn!("Failed to connect to peer at {}: {}", address, e);
1624                return Err(P2PError::Transport(
1625                    crate::error::TransportError::ConnectionFailed {
1626                        addr: normalized_addr,
1627                        reason: e.to_string().into(),
1628                    },
1629                ));
1630            }
1631            Err(_) => {
1632                warn!(
1633                    "Timed out connecting to peer at {} after {:?}",
1634                    address, self.config.connection_timeout
1635                );
1636                return Err(P2PError::Timeout(self.config.connection_timeout));
1637            }
1638        };
1639
1640        // Create peer info with connection details
1641        let peer_info = PeerInfo {
1642            peer_id: peer_id.clone(),
1643            addresses: vec![address.to_string()],
1644            connected_at: Instant::now(),
1645            last_seen: Instant::now(),
1646            status: ConnectionStatus::Connected,
1647            protocols: vec!["p2p-foundation/1.0".to_string()],
1648            heartbeat_count: 0,
1649        };
1650
1651        // Store peer information
1652        self.peers.write().await.insert(peer_id.clone(), peer_info);
1653
1654        // Add to active connections tracking
1655        // This is critical for is_connection_active() to work correctly
1656        self.active_connections
1657            .write()
1658            .await
1659            .insert(peer_id.clone());
1660
1661        // Record bandwidth usage if resource manager is enabled
1662        if let Some(ref resource_manager) = self.resource_manager {
1663            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1664        }
1665
1666        // Emit connection event
1667        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1668
1669        info!("Connected to peer: {}", peer_id);
1670        Ok(peer_id)
1671    }
1672
1673    /// Disconnect from a peer
1674    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1675        info!("Disconnecting from peer: {}", peer_id);
1676
1677        // Remove from active connections
1678        self.active_connections.write().await.remove(peer_id);
1679
1680        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1681            peer_info.status = ConnectionStatus::Disconnected;
1682
1683            // Emit event
1684            let _ = self
1685                .event_tx
1686                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1687
1688            info!("Disconnected from peer: {}", peer_id);
1689        }
1690
1691        Ok(())
1692    }
1693
1694    /// Check if a connection to a peer is active
1695    pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1696        self.active_connections.read().await.contains(peer_id)
1697    }
1698
1699    /// Send a message to a peer
1700    pub async fn send_message(
1701        &self,
1702        peer_id: &PeerId,
1703        protocol: &str,
1704        data: Vec<u8>,
1705    ) -> Result<()> {
1706        debug!(
1707            "Sending message to peer {} on protocol {}",
1708            peer_id, protocol
1709        );
1710
1711        // Check rate limits if resource manager is enabled
1712        if let Some(ref resource_manager) = self.resource_manager
1713            && !resource_manager
1714                .check_rate_limit(peer_id, "message")
1715                .await?
1716        {
1717            return Err(P2PError::ResourceExhausted(
1718                format!("Rate limit exceeded for peer {}", peer_id).into(),
1719            ));
1720        }
1721
1722        // Check if peer exists in peers map
1723        if !self.peers.read().await.contains_key(peer_id) {
1724            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1725                peer_id.to_string().into(),
1726            )));
1727        }
1728
1729        // **NEW**: Check if the ant-quic connection is actually active
1730        // This is the critical fix for the connection state synchronization issue
1731        if !self.is_connection_active(peer_id).await {
1732            debug!(
1733                "Connection to peer {} exists in peers map but ant-quic connection is closed",
1734                peer_id
1735            );
1736
1737            // Clean up stale peer entry
1738            self.remove_peer(peer_id).await;
1739
1740            return Err(P2PError::Network(
1741                crate::error::NetworkError::ConnectionClosed {
1742                    peer_id: peer_id.to_string().into(),
1743                },
1744            ));
1745        }
1746
1747        // MCP removed: no special-case protocol validation
1748
1749        // Record bandwidth usage if resource manager is enabled
1750        if let Some(ref resource_manager) = self.resource_manager {
1751            resource_manager.record_bandwidth(data.len() as u64, 0);
1752        }
1753
1754        // Create protocol message wrapper
1755        let _message_data = self.create_protocol_message(protocol, data)?;
1756
1757        // Send via ant-quic dual-node
1758        let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1759        tokio::time::timeout(self.config.connection_timeout, send_fut)
1760            .await
1761            .map_err(|_| {
1762                P2PError::Transport(crate::error::TransportError::StreamError(
1763                    "Timed out sending message".into(),
1764                ))
1765            })?
1766            .map_err(|e| {
1767                P2PError::Transport(crate::error::TransportError::StreamError(
1768                    e.to_string().into(),
1769                ))
1770            })
1771    }
1772
1773    /// Create a protocol message wrapper
1774    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1775        use serde_json::json;
1776
1777        let timestamp = std::time::SystemTime::now()
1778            .duration_since(std::time::UNIX_EPOCH)
1779            .map_err(|e| {
1780                P2PError::Network(NetworkError::ProtocolError(
1781                    format!("System time error: {}", e).into(),
1782                ))
1783            })?
1784            .as_secs();
1785
1786        // Create a simple message format for P2P communication
1787        let message = json!({
1788            "protocol": protocol,
1789            "data": data,
1790            "from": self.peer_id,
1791            "timestamp": timestamp
1792        });
1793
1794        serde_json::to_vec(&message).map_err(|e| {
1795            P2PError::Transport(crate::error::TransportError::StreamError(
1796                format!("Failed to serialize message: {e}").into(),
1797            ))
1798        })
1799    }
1800
1801    // Note: async listen_addrs() already exists above for fetching listen addresses
1802}
1803
1804/// Create a protocol message wrapper (static version for background tasks)
1805#[allow(dead_code)]
1806fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1807    use serde_json::json;
1808
1809    let timestamp = std::time::SystemTime::now()
1810        .duration_since(std::time::UNIX_EPOCH)
1811        .map_err(|e| {
1812            P2PError::Network(NetworkError::ProtocolError(
1813                format!("System time error: {}", e).into(),
1814            ))
1815        })?
1816        .as_secs();
1817
1818    // Create a simple message format for P2P communication
1819    let message = json!({
1820        "protocol": protocol,
1821        "data": data,
1822        "timestamp": timestamp
1823    });
1824
1825    serde_json::to_vec(&message).map_err(|e| {
1826        P2PError::Transport(crate::error::TransportError::StreamError(
1827            format!("Failed to serialize message: {e}").into(),
1828        ))
1829    })
1830}
1831
1832impl P2PNode {
1833    /// Subscribe to network events
1834    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1835        self.event_tx.subscribe()
1836    }
1837
1838    /// Backwards-compat event stream accessor for tests
1839    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1840        self.subscribe_events()
1841    }
1842
1843    /// Get node uptime
1844    pub fn uptime(&self) -> Duration {
1845        self.start_time.elapsed()
1846    }
1847
1848    // =========================================================================
1849    // Attestation Methods (Phase 1: Soft Enforcement)
1850    // =========================================================================
1851
1852    /// Compute the BLAKE3 hash of the running binary.
1853    ///
1854    /// In production, this reads the actual executable file and hashes it.
1855    /// Returns a placeholder hash if the binary cannot be read.
1856    fn compute_binary_hash() -> [u8; 32] {
1857        // Try to get the path to the current executable and hash it
1858        if let Some(hash) = std::env::current_exe()
1859            .ok()
1860            .and_then(|exe_path| std::fs::read(&exe_path).ok())
1861            .map(|binary_data| blake3::hash(&binary_data))
1862        {
1863            return *hash.as_bytes();
1864        }
1865        // Fallback: return a deterministic placeholder based on compile-time info
1866        // This allows tests and development to work without actual binary hashing
1867        let placeholder = format!(
1868            "saorsa-core-v{}-{}",
1869            env!("CARGO_PKG_VERSION"),
1870            std::env::consts::ARCH
1871        );
1872        let hash = blake3::hash(placeholder.as_bytes());
1873        *hash.as_bytes()
1874    }
1875
1876    /// Get this node's binary hash used for attestation.
1877    #[must_use]
1878    pub fn binary_hash(&self) -> &[u8; 32] {
1879        &self.binary_hash
1880    }
1881
1882    /// Get this node's entangled identity, if set.
1883    #[must_use]
1884    pub fn entangled_id(&self) -> Option<&crate::attestation::EntangledId> {
1885        self.entangled_id.as_ref()
1886    }
1887
1888    /// Set the entangled identity for this node.
1889    ///
1890    /// This should be called after the node's cryptographic identity is established,
1891    /// typically by deriving from the NodeIdentity's public key.
1892    pub fn set_entangled_id(&mut self, entangled_id: crate::attestation::EntangledId) {
1893        self.entangled_id = Some(entangled_id);
1894    }
1895
1896    /// Verify a peer's attestation and return the enforcement decision.
1897    ///
1898    /// This function implements the Entangled Attestation verification protocol
1899    /// (Phase 6: Hard Enforcement). Based on the configured enforcement mode:
1900    ///
1901    /// - **Off**: Skips verification entirely
1902    /// - **Soft**: Logs warnings but allows connections
1903    /// - **Hard**: Rejects connections with invalid attestations
1904    ///
1905    /// # Arguments
1906    /// * `peer_id` - The peer's identifier for logging
1907    /// * `peer_entangled_id` - The peer's claimed entangled ID
1908    /// * `peer_public_key` - The peer's ML-DSA public key
1909    ///
1910    /// # Returns
1911    /// An [`EnforcementDecision`] indicating whether to allow or reject the connection.
1912    ///
1913    /// # Example
1914    /// ```rust,ignore
1915    /// let decision = node.verify_peer_attestation(peer_id, &entangled_id, &public_key);
1916    /// if decision.should_reject() {
1917    ///     // Send rejection message and close connection
1918    ///     if let Some(rejection) = decision.rejection() {
1919    ///         send_rejection(peer_id, rejection);
1920    ///     }
1921    ///     disconnect(peer_id);
1922    /// }
1923    /// ```
1924    pub fn verify_peer_attestation(
1925        &self,
1926        peer_id: &str,
1927        peer_entangled_id: &crate::attestation::EntangledId,
1928        peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
1929    ) -> crate::attestation::EnforcementDecision {
1930        use crate::attestation::{
1931            AttestationRejection, AttestationRejectionReason, EnforcementDecision, EnforcementMode,
1932        };
1933
1934        let config = &self.config.attestation_config;
1935
1936        // Skip verification if attestation is disabled
1937        if !config.enabled {
1938            return EnforcementDecision::Skipped;
1939        }
1940
1941        // Verify the entangled ID derivation
1942        let id_valid = peer_entangled_id.verify(peer_public_key);
1943
1944        // Check binary hash allowlist (if configured)
1945        let binary_hash = *peer_entangled_id.binary_hash();
1946        let binary_allowed = config.is_binary_allowed(&binary_hash);
1947
1948        match config.enforcement_mode {
1949            EnforcementMode::Off => EnforcementDecision::Skipped,
1950
1951            EnforcementMode::Soft => {
1952                // Soft enforcement: log warnings but allow connections
1953                if !id_valid {
1954                    warn!(
1955                        peer = %peer_id,
1956                        binary_hash = %hex::encode(&binary_hash[..8]),
1957                        "Peer attestation verification failed: Invalid entangled ID (soft mode - allowing)"
1958                    );
1959                    return EnforcementDecision::AllowWithWarning {
1960                        reason: AttestationRejectionReason::IdentityMismatch,
1961                    };
1962                }
1963                if !binary_allowed {
1964                    warn!(
1965                        peer = %peer_id,
1966                        binary_hash = %hex::encode(binary_hash),
1967                        "Peer attestation verification failed: Binary not in allowlist (soft mode - allowing)"
1968                    );
1969                    return EnforcementDecision::AllowWithWarning {
1970                        reason: AttestationRejectionReason::BinaryNotAllowed { hash: binary_hash },
1971                    };
1972                }
1973                EnforcementDecision::Allow
1974            }
1975
1976            EnforcementMode::Hard => {
1977                // Hard enforcement: reject invalid attestations
1978                if !id_valid {
1979                    error!(
1980                        peer = %peer_id,
1981                        binary_hash = %hex::encode(&binary_hash[..8]),
1982                        "REJECTING peer: Invalid entangled ID derivation"
1983                    );
1984                    return EnforcementDecision::Reject {
1985                        rejection: AttestationRejection::identity_mismatch(),
1986                    };
1987                }
1988                if !binary_allowed {
1989                    error!(
1990                        peer = %peer_id,
1991                        binary_hash = %hex::encode(binary_hash),
1992                        "REJECTING peer: Binary not in allowlist"
1993                    );
1994                    return EnforcementDecision::Reject {
1995                        rejection: AttestationRejection::binary_not_allowed(binary_hash),
1996                    };
1997                }
1998
1999                info!(
2000                    peer = %peer_id,
2001                    entangled_id = %hex::encode(&peer_entangled_id.id()[..8]),
2002                    "Peer attestation verified successfully (hard mode)"
2003                );
2004                EnforcementDecision::Allow
2005            }
2006        }
2007    }
2008
2009    /// Verify a peer's attestation and return a simple boolean result.
2010    ///
2011    /// This is a convenience method that wraps [`verify_peer_attestation`] for cases
2012    /// where only a pass/fail result is needed without the detailed decision.
2013    ///
2014    /// # Returns
2015    /// `true` if the connection should be allowed, `false` if it should be rejected.
2016    #[must_use]
2017    pub fn verify_peer_attestation_simple(
2018        &self,
2019        peer_id: &str,
2020        peer_entangled_id: &crate::attestation::EntangledId,
2021        peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
2022    ) -> bool {
2023        self.verify_peer_attestation(peer_id, peer_entangled_id, peer_public_key)
2024            .should_allow()
2025    }
2026
2027    // MCP removed: all MCP tool/service methods removed
2028
2029    // /// Handle MCP remote tool call with network integration
2030
2031    // /// List tools available on a specific remote peer
2032
2033    // /// Get MCP server statistics
2034
2035    /// Get production resource metrics
2036    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
2037        if let Some(ref resource_manager) = self.resource_manager {
2038            Ok(resource_manager.get_metrics().await)
2039        } else {
2040            Err(P2PError::Network(
2041                crate::error::NetworkError::ProtocolError(
2042                    "Production resource manager not enabled".to_string().into(),
2043                ),
2044            ))
2045        }
2046    }
2047
2048    /// Connection lifecycle monitor task - processes ant-quic connection events
2049    /// and updates active_connections HashSet and peers map.
2050    ///
2051    /// This version accepts a pre-subscribed receiver to avoid the race condition
2052    /// where early connections could be missed if subscription happens after the task starts.
2053    #[allow(clippy::too_many_arguments)]
2054    async fn connection_lifecycle_monitor_with_rx(
2055        _dual_node: Arc<DualStackNetworkNode>,
2056        mut event_rx: broadcast::Receiver<crate::transport::ant_quic_adapter::ConnectionEvent>,
2057        active_connections: Arc<RwLock<HashSet<String>>>,
2058        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
2059        event_tx: broadcast::Sender<P2PEvent>,
2060        geo_provider: Arc<BgpGeoProvider>,
2061        _local_peer_id: String,
2062    ) {
2063        use crate::transport::ant_quic_adapter::ConnectionEvent;
2064
2065        info!("Connection lifecycle monitor started (pre-subscribed receiver)");
2066
2067        loop {
2068            match event_rx.recv().await {
2069                Ok(event) => {
2070                    match event {
2071                        ConnectionEvent::Established {
2072                            peer_id,
2073                            remote_address,
2074                        } => {
2075                            let peer_id_str =
2076                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2077                            debug!(
2078                                "Connection established: peer={}, addr={}",
2079                                peer_id_str, remote_address
2080                            );
2081
2082                            // **GeoIP Validation**
2083                            let ip = remote_address.ip();
2084                            let is_rejected = match ip {
2085                                std::net::IpAddr::V4(v4) => {
2086                                    if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
2087                                        geo_provider.is_hosting_asn(asn)
2088                                            || geo_provider.is_vpn_asn(asn)
2089                                    } else {
2090                                        false
2091                                    }
2092                                }
2093                                std::net::IpAddr::V6(v6) => {
2094                                    let info = geo_provider.lookup(v6);
2095                                    info.is_hosting_provider || info.is_vpn_provider
2096                                }
2097                            };
2098
2099                            if is_rejected {
2100                                info!(
2101                                    "Rejecting connection from {} ({}) due to GeoIP policy",
2102                                    peer_id_str, remote_address
2103                                );
2104                                continue;
2105                            }
2106
2107                            // Add to active connections
2108                            active_connections.write().await.insert(peer_id_str.clone());
2109
2110                            // Update peer info or insert new
2111                            let mut peers_lock = peers.write().await;
2112                            if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
2113                                peer_info.status = ConnectionStatus::Connected;
2114                                peer_info.connected_at = Instant::now();
2115                            } else {
2116                                debug!("Registering new incoming peer: {}", peer_id_str);
2117                                peers_lock.insert(
2118                                    peer_id_str.clone(),
2119                                    PeerInfo {
2120                                        peer_id: peer_id_str.clone(),
2121                                        addresses: vec![remote_address.to_string()],
2122                                        status: ConnectionStatus::Connected,
2123                                        last_seen: Instant::now(),
2124                                        connected_at: Instant::now(),
2125                                        protocols: Vec::new(),
2126                                        heartbeat_count: 0,
2127                                    },
2128                                );
2129                            }
2130
2131                            // Broadcast connection event
2132                            let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
2133                        }
2134                        ConnectionEvent::Lost { peer_id, reason } => {
2135                            let peer_id_str =
2136                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2137                            debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
2138
2139                            // Remove from active connections
2140                            active_connections.write().await.remove(&peer_id_str);
2141
2142                            // Update peer info status
2143                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2144                                peer_info.status = ConnectionStatus::Disconnected;
2145                                peer_info.last_seen = Instant::now();
2146                            }
2147
2148                            // Broadcast disconnection event
2149                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2150                        }
2151                        ConnectionEvent::Failed { peer_id, reason } => {
2152                            let peer_id_str =
2153                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2154                            debug!("Connection failed: peer={}, reason={}", peer_id_str, reason);
2155
2156                            // Remove from active connections
2157                            active_connections.write().await.remove(&peer_id_str);
2158
2159                            // Update peer info status
2160                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2161                                peer_info.status = ConnectionStatus::Disconnected;
2162                                peer_info.last_seen = Instant::now();
2163                            }
2164
2165                            // Broadcast disconnection event
2166                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2167                        }
2168                    }
2169                }
2170                Err(broadcast::error::RecvError::Lagged(skipped)) => {
2171                    warn!(
2172                        "Connection event receiver lagged, skipped {} events",
2173                        skipped
2174                    );
2175                }
2176                Err(broadcast::error::RecvError::Closed) => {
2177                    info!("Connection event channel closed, stopping lifecycle monitor");
2178                    break;
2179                }
2180            }
2181        }
2182    }
2183
2184    /// Connection lifecycle monitor task - processes ant-quic connection events
2185    /// and updates active_connections HashSet and peers map
2186    ///
2187    /// DEPRECATED: Use `connection_lifecycle_monitor_with_rx` instead to avoid race conditions
2188    #[allow(dead_code)]
2189    async fn connection_lifecycle_monitor(
2190        dual_node: Arc<DualStackNetworkNode>,
2191        active_connections: Arc<RwLock<HashSet<String>>>,
2192        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
2193        event_tx: broadcast::Sender<P2PEvent>,
2194        geo_provider: Arc<BgpGeoProvider>,
2195        local_peer_id: String,
2196    ) {
2197        use crate::transport::ant_quic_adapter::ConnectionEvent;
2198
2199        let mut event_rx = dual_node.subscribe_connection_events();
2200
2201        info!("Connection lifecycle monitor started");
2202
2203        loop {
2204            match event_rx.recv().await {
2205                Ok(event) => {
2206                    match event {
2207                        ConnectionEvent::Established {
2208                            peer_id,
2209                            remote_address,
2210                        } => {
2211                            let peer_id_str =
2212                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2213                            debug!(
2214                                "Connection established: peer={}, addr={}",
2215                                peer_id_str, remote_address
2216                            );
2217
2218                            // **GeoIP Validation**
2219                            // Check if the peer's IP is allowed
2220                            let ip = remote_address.ip();
2221                            let is_rejected = match ip {
2222                                std::net::IpAddr::V4(v4) => {
2223                                    // Check if it's a hosting provider or VPN
2224                                    if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
2225                                        geo_provider.is_hosting_asn(asn)
2226                                            || geo_provider.is_vpn_asn(asn)
2227                                    } else {
2228                                        false
2229                                    }
2230                                }
2231                                std::net::IpAddr::V6(v6) => {
2232                                    let info = geo_provider.lookup(v6);
2233                                    info.is_hosting_provider || info.is_vpn_provider
2234                                }
2235                            };
2236
2237                            if is_rejected {
2238                                info!(
2239                                    "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
2240                                    peer_id_str, remote_address
2241                                );
2242
2243                                // Create rejection message
2244                                let rejection = RejectionMessage {
2245                                    reason: RejectionReason::GeoIpPolicy,
2246                                    message:
2247                                        "Connection rejected: Hosting/VPN providers not allowed"
2248                                            .to_string(),
2249                                    suggested_target: None, // Could suggest a different region if we knew more
2250                                };
2251
2252                                // Serialize message
2253                                if let Ok(data) = serde_json::to_vec(&rejection) {
2254                                    // Create protocol message
2255                                    let timestamp = std::time::SystemTime::now()
2256                                        .duration_since(std::time::UNIX_EPOCH)
2257                                        .unwrap_or_default()
2258                                        .as_secs();
2259
2260                                    let message = serde_json::json!({
2261                                        "protocol": "control",
2262                                        "data": data,
2263                                        "from": local_peer_id,
2264                                        "timestamp": timestamp
2265                                    });
2266
2267                                    if let Ok(msg_bytes) = serde_json::to_vec(&message) {
2268                                        // Send rejection message
2269                                        // We use send_to_peer directly on dual_node to avoid the checks in P2PNode::send_message
2270                                        // which might fail if we haven't fully registered the peer yet
2271                                        let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
2272
2273                                        // Give it a moment to send before disconnecting?
2274                                        // ant-quic might handle this, but a small yield is safe
2275                                        tokio::task::yield_now().await;
2276                                    }
2277                                }
2278
2279                                // Disconnect (TODO: Add disconnect method to dual_node or just drop?)
2280                                // For now, we just don't add it to active connections, effectively ignoring it
2281                                // Ideally we should actively close the connection
2282                                continue;
2283                            }
2284
2285                            // Add to active connections
2286                            active_connections.write().await.insert(peer_id_str.clone());
2287
2288                            // Update peer info or insert new
2289                            let mut peers_lock = peers.write().await;
2290                            if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
2291                                peer_info.status = ConnectionStatus::Connected;
2292                                peer_info.connected_at = Instant::now();
2293                            } else {
2294                                // New incoming peer
2295                                debug!("Registering new incoming peer: {}", peer_id_str);
2296                                peers_lock.insert(
2297                                    peer_id_str.clone(),
2298                                    PeerInfo {
2299                                        peer_id: peer_id_str.clone(),
2300                                        addresses: vec![remote_address.to_string()],
2301                                        status: ConnectionStatus::Connected,
2302                                        last_seen: Instant::now(),
2303                                        connected_at: Instant::now(),
2304                                        protocols: Vec::new(),
2305                                        heartbeat_count: 0,
2306                                    },
2307                                );
2308                            }
2309
2310                            // Broadcast connection event
2311                            let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
2312                        }
2313                        ConnectionEvent::Lost { peer_id, reason } => {
2314                            let peer_id_str =
2315                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2316                            debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
2317
2318                            // Remove from active connections
2319                            active_connections.write().await.remove(&peer_id_str);
2320
2321                            // Update peer info status
2322                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2323                                peer_info.status = ConnectionStatus::Disconnected;
2324                                peer_info.last_seen = Instant::now();
2325                            }
2326
2327                            // Broadcast disconnection event
2328                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2329                        }
2330                        ConnectionEvent::Failed { peer_id, reason } => {
2331                            let peer_id_str =
2332                                crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2333                            warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
2334
2335                            // Remove from active connections
2336                            active_connections.write().await.remove(&peer_id_str);
2337
2338                            // Update peer info status
2339                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2340                                peer_info.status = ConnectionStatus::Failed(reason.clone());
2341                            }
2342
2343                            // Broadcast disconnection event
2344                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2345                        }
2346                    }
2347                }
2348                Err(broadcast::error::RecvError::Lagged(skipped)) => {
2349                    warn!(
2350                        "Connection event monitor lagged, skipped {} events",
2351                        skipped
2352                    );
2353                    continue;
2354                }
2355                Err(broadcast::error::RecvError::Closed) => {
2356                    info!("Connection event channel closed, stopping monitor");
2357                    break;
2358                }
2359            }
2360        }
2361
2362        info!("Connection lifecycle monitor stopped");
2363    }
2364
2365    /// Start connection monitor (called after node initialization)
2366    async fn start_connection_monitor(&self) {
2367        // The monitor task is already spawned in new() with a temporary peers map
2368        // This method is a placeholder for future enhancements where we might
2369        // need to restart the monitor or provide it with updated references
2370        debug!("Connection monitor already running from initialization");
2371    }
2372
2373    /// Keepalive task - sends periodic pings to prevent 30-second idle timeout
2374    ///
2375    /// ant-quic has a 30-second max_idle_timeout. This task sends a small keepalive
2376    /// message every 15 seconds (half the timeout) to all active connections to prevent
2377    /// them from timing out during periods of inactivity.
2378    async fn keepalive_task(
2379        active_connections: Arc<RwLock<HashSet<String>>>,
2380        dual_node: Arc<DualStackNetworkNode>,
2381        shutdown: Arc<AtomicBool>,
2382    ) {
2383        use tokio::time::{Duration, interval};
2384
2385        const KEEPALIVE_INTERVAL_SECS: u64 = 15; // Half of 30-second timeout
2386        const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; // Small payload
2387
2388        let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
2389        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2390
2391        info!(
2392            "Keepalive task started (interval: {}s)",
2393            KEEPALIVE_INTERVAL_SECS
2394        );
2395
2396        loop {
2397            // Check shutdown flag first
2398            if shutdown.load(Ordering::Relaxed) {
2399                info!("Keepalive task shutting down");
2400                break;
2401            }
2402
2403            interval.tick().await;
2404
2405            // Get snapshot of active connections
2406            let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
2407
2408            if peers.is_empty() {
2409                trace!("Keepalive: no active connections");
2410                continue;
2411            }
2412
2413            debug!("Sending keepalive to {} active connections", peers.len());
2414
2415            // Send keepalive to each peer
2416            for peer_id in peers {
2417                match dual_node
2418                    .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
2419                    .await
2420                {
2421                    Ok(_) => {
2422                        trace!("Keepalive sent to peer: {}", peer_id);
2423                    }
2424                    Err(e) => {
2425                        debug!(
2426                            "Failed to send keepalive to peer {}: {} (connection may have closed)",
2427                            peer_id, e
2428                        );
2429                        // Don't remove from active_connections here - let the lifecycle monitor handle it
2430                    }
2431                }
2432            }
2433        }
2434
2435        info!("Keepalive task stopped");
2436    }
2437
2438    /// Check system health
2439    pub async fn health_check(&self) -> Result<()> {
2440        if let Some(ref resource_manager) = self.resource_manager {
2441            resource_manager.health_check().await
2442        } else {
2443            // Basic health check without resource manager
2444            let peer_count = self.peer_count().await;
2445            if peer_count > self.config.max_connections {
2446                Err(P2PError::Network(
2447                    crate::error::NetworkError::ProtocolError(
2448                        format!("Too many connections: {peer_count}").into(),
2449                    ),
2450                ))
2451            } else {
2452                Ok(())
2453            }
2454        }
2455    }
2456
2457    /// Get production configuration (if enabled)
2458    pub fn production_config(&self) -> Option<&ProductionConfig> {
2459        self.config.production_config.as_ref()
2460    }
2461
2462    /// Check if production hardening is enabled
2463    pub fn is_production_mode(&self) -> bool {
2464        self.resource_manager.is_some()
2465    }
2466
2467    /// Get DHT reference
2468    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2469        self.dht.as_ref()
2470    }
2471
2472    /// Store a value in the DHT
2473    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2474        if let Some(ref dht) = self.dht {
2475            let mut dht_instance = dht.write().await;
2476            let dht_key = crate::dht::DhtKey::from_bytes(key);
2477            dht_instance
2478                .store(&dht_key, value.clone())
2479                .await
2480                .map_err(|e| {
2481                    P2PError::Dht(crate::error::DhtError::StoreFailed(
2482                        format!("{:?}: {e}", key).into(),
2483                    ))
2484                })?;
2485
2486            Ok(())
2487        } else {
2488            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2489                "DHT not enabled".to_string().into(),
2490            )))
2491        }
2492    }
2493
2494    /// Retrieve a value from the DHT
2495    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2496        if let Some(ref dht) = self.dht {
2497            let dht_instance = dht.read().await;
2498            let dht_key = crate::dht::DhtKey::from_bytes(key);
2499            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2500                P2PError::Dht(crate::error::DhtError::StoreFailed(
2501                    format!("Retrieve failed: {e}").into(),
2502                ))
2503            })?;
2504
2505            Ok(record_result)
2506        } else {
2507            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2508                "DHT not enabled".to_string().into(),
2509            )))
2510        }
2511    }
2512
2513    /// Add a discovered peer to the bootstrap cache
2514    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2515        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2516            let manager = bootstrap_manager.write().await;
2517            let socket_addresses: Vec<std::net::SocketAddr> = addresses
2518                .iter()
2519                .filter_map(|addr| addr.parse().ok())
2520                .collect();
2521            let contact = ContactEntry::new(peer_id, socket_addresses);
2522            manager.add_contact(contact).await.map_err(|e| {
2523                P2PError::Network(crate::error::NetworkError::ProtocolError(
2524                    format!("Failed to add peer to bootstrap cache: {e}").into(),
2525                ))
2526            })?;
2527        }
2528        Ok(())
2529    }
2530
2531    /// Update connection metrics for a peer in the bootstrap cache
2532    pub async fn update_peer_metrics(
2533        &self,
2534        peer_id: &PeerId,
2535        success: bool,
2536        latency_ms: Option<u64>,
2537        _error: Option<String>,
2538    ) -> Result<()> {
2539        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2540            let manager = bootstrap_manager.write().await;
2541
2542            // Create quality metrics based on the connection result
2543            let metrics = QualityMetrics {
2544                success_rate: if success { 1.0 } else { 0.0 },
2545                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2546                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
2547                last_connection_attempt: chrono::Utc::now(),
2548                last_successful_connection: if success {
2549                    chrono::Utc::now()
2550                } else {
2551                    chrono::Utc::now() - chrono::Duration::hours(1)
2552                },
2553                uptime_score: 0.5,
2554            };
2555
2556            manager
2557                .update_contact_metrics(peer_id, metrics)
2558                .await
2559                .map_err(|e| {
2560                    P2PError::Network(crate::error::NetworkError::ProtocolError(
2561                        format!("Failed to update peer metrics: {e}").into(),
2562                    ))
2563                })?;
2564        }
2565        Ok(())
2566    }
2567
2568    /// Get bootstrap cache statistics
2569    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2570        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2571            let manager = bootstrap_manager.read().await;
2572            let stats = manager.get_stats().await.map_err(|e| {
2573                P2PError::Network(crate::error::NetworkError::ProtocolError(
2574                    format!("Failed to get bootstrap stats: {e}").into(),
2575                ))
2576            })?;
2577            Ok(Some(stats))
2578        } else {
2579            Ok(None)
2580        }
2581    }
2582
2583    /// Get the number of cached bootstrap peers
2584    pub async fn cached_peer_count(&self) -> usize {
2585        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2586            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2587        {
2588            return stats.total_contacts;
2589        }
2590        0
2591    }
2592
2593    /// Connect to bootstrap peers
2594    async fn connect_bootstrap_peers(&self) -> Result<()> {
2595        let mut bootstrap_contacts = Vec::new();
2596        let mut used_cache = false;
2597        let mut seen_addresses = std::collections::HashSet::new();
2598
2599        // CLI-provided bootstrap peers take priority - always include them first
2600        let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2601            self.config.bootstrap_peers_str.clone()
2602        } else {
2603            // Convert Multiaddr to strings
2604            self.config
2605                .bootstrap_peers
2606                .iter()
2607                .map(|addr| addr.to_string())
2608                .collect::<Vec<_>>()
2609        };
2610
2611        if !cli_bootstrap_peers.is_empty() {
2612            info!(
2613                "Using {} CLI-provided bootstrap peers (priority)",
2614                cli_bootstrap_peers.len()
2615            );
2616            for addr in &cli_bootstrap_peers {
2617                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2618                    seen_addresses.insert(socket_addr);
2619                    let contact = ContactEntry::new(
2620                        format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2621                        vec![socket_addr],
2622                    );
2623                    bootstrap_contacts.push(contact);
2624                } else {
2625                    warn!("Invalid bootstrap address format: {}", addr);
2626                }
2627            }
2628        }
2629
2630        // Supplement with cached bootstrap peers (after CLI peers)
2631        // Use QUIC-specific peer selection since we're using ant-quic transport
2632        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2633            let manager = bootstrap_manager.read().await;
2634            match manager.get_quic_bootstrap_peers(20).await {
2635                // Try to get top 20 quality QUIC-enabled peers
2636                Ok(contacts) => {
2637                    if !contacts.is_empty() {
2638                        let mut added_from_cache = 0;
2639                        for contact in contacts {
2640                            // Only add if we haven't already added this address from CLI
2641                            let new_addresses: Vec<_> = contact
2642                                .addresses
2643                                .iter()
2644                                .filter(|addr| !seen_addresses.contains(addr))
2645                                .copied()
2646                                .collect();
2647
2648                            if !new_addresses.is_empty() {
2649                                for addr in &new_addresses {
2650                                    seen_addresses.insert(*addr);
2651                                }
2652                                let mut contact = contact.clone();
2653                                contact.addresses = new_addresses;
2654                                bootstrap_contacts.push(contact);
2655                                added_from_cache += 1;
2656                            }
2657                        }
2658                        if added_from_cache > 0 {
2659                            info!(
2660                                "Added {} cached bootstrap peers (supplementing CLI peers)",
2661                                added_from_cache
2662                            );
2663                            used_cache = true;
2664                        }
2665                    }
2666                }
2667                Err(e) => {
2668                    warn!("Failed to get cached bootstrap peers: {}", e);
2669                }
2670            }
2671        }
2672
2673        if bootstrap_contacts.is_empty() {
2674            info!("No bootstrap peers configured and no cached peers available");
2675            return Ok(());
2676        }
2677
2678        // Connect to bootstrap peers
2679        let mut successful_connections = 0;
2680        for contact in bootstrap_contacts {
2681            for addr in &contact.addresses {
2682                match self.connect_peer(&addr.to_string()).await {
2683                    Ok(peer_id) => {
2684                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2685                        successful_connections += 1;
2686
2687                        // Update bootstrap cache with successful connection
2688                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2689                            let manager = bootstrap_manager.write().await;
2690                            let mut updated_contact = contact.clone();
2691                            updated_contact.peer_id = peer_id.clone();
2692                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
2693
2694                            if let Err(e) = manager.add_contact(updated_contact).await {
2695                                warn!("Failed to update bootstrap cache: {}", e);
2696                            }
2697                        }
2698                        break; // Successfully connected, move to next contact
2699                    }
2700                    Err(e) => {
2701                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2702
2703                        // Update bootstrap cache with failed connection
2704                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2705                            let manager = bootstrap_manager.write().await;
2706                            let mut updated_contact = contact.clone();
2707                            updated_contact.update_connection_result(
2708                                false,
2709                                None,
2710                                Some(e.to_string()),
2711                            );
2712
2713                            if let Err(e) = manager.add_contact(updated_contact).await {
2714                                warn!("Failed to update bootstrap cache: {}", e);
2715                            }
2716                        }
2717                    }
2718                }
2719            }
2720        }
2721
2722        if successful_connections == 0 {
2723            if !used_cache {
2724                warn!("Failed to connect to any bootstrap peers");
2725            }
2726            // Starting a node should not be gated on immediate bootstrap connectivity.
2727            // Keep running and allow background discovery / retries to populate peers later.
2728            return Ok(());
2729        }
2730        info!(
2731            "Successfully connected to {} bootstrap peers",
2732            successful_connections
2733        );
2734
2735        Ok(())
2736    }
2737
2738    /// Disconnect from all peers
2739    async fn disconnect_all_peers(&self) -> Result<()> {
2740        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2741
2742        for peer_id in peer_ids {
2743            self.disconnect_peer(&peer_id).await?;
2744        }
2745
2746        Ok(())
2747    }
2748
2749    /// Perform periodic maintenance tasks
2750    async fn periodic_tasks(&self) -> Result<()> {
2751        // Update peer last seen timestamps
2752        // Remove stale connections
2753        // Perform DHT maintenance
2754        // This is a placeholder for now
2755
2756        Ok(())
2757    }
2758}
2759
2760/// Network sender trait for sending messages
2761#[async_trait::async_trait]
2762pub trait NetworkSender: Send + Sync {
2763    /// Send a message to a specific peer
2764    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2765
2766    /// Get our local peer ID
2767    fn local_peer_id(&self) -> &PeerId;
2768}
2769
2770/// Lightweight wrapper for P2PNode to implement NetworkSender
2771#[derive(Clone)]
2772pub struct P2PNetworkSender {
2773    peer_id: PeerId,
2774    // Use channels for async communication with the P2P node
2775    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2776}
2777
2778impl P2PNetworkSender {
2779    pub fn new(
2780        peer_id: PeerId,
2781        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2782    ) -> Self {
2783        Self { peer_id, send_tx }
2784    }
2785}
2786
2787/// Implementation of NetworkSender trait for P2PNetworkSender
2788#[async_trait::async_trait]
2789impl NetworkSender for P2PNetworkSender {
2790    /// Send a message to a specific peer via the P2P network
2791    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2792        self.send_tx
2793            .send((peer_id.clone(), protocol.to_string(), data))
2794            .map_err(|_| {
2795                P2PError::Network(crate::error::NetworkError::ProtocolError(
2796                    "Failed to send message via channel".to_string().into(),
2797                ))
2798            })?;
2799        Ok(())
2800    }
2801
2802    /// Get our local peer ID
2803    fn local_peer_id(&self) -> &PeerId {
2804        &self.peer_id
2805    }
2806}
2807
2808/// Builder pattern for creating P2P nodes
2809pub struct NodeBuilder {
2810    config: NodeConfig,
2811}
2812
2813impl Default for NodeBuilder {
2814    fn default() -> Self {
2815        Self::new()
2816    }
2817}
2818
2819impl NodeBuilder {
2820    /// Create a new node builder
2821    pub fn new() -> Self {
2822        Self {
2823            config: NodeConfig::default(),
2824        }
2825    }
2826
2827    /// Set the peer ID
2828    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2829        self.config.peer_id = Some(peer_id);
2830        self
2831    }
2832
2833    /// Add a listen address
2834    pub fn listen_on(mut self, addr: &str) -> Self {
2835        if let Ok(multiaddr) = addr.parse() {
2836            self.config.listen_addrs.push(multiaddr);
2837        }
2838        self
2839    }
2840
2841    /// Add a bootstrap peer
2842    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2843        if let Ok(multiaddr) = addr.parse() {
2844            self.config.bootstrap_peers.push(multiaddr);
2845        }
2846        self.config.bootstrap_peers_str.push(addr.to_string());
2847        self
2848    }
2849
2850    /// Enable IPv6 support
2851    pub fn with_ipv6(mut self, enable: bool) -> Self {
2852        self.config.enable_ipv6 = enable;
2853        self
2854    }
2855
2856    // MCP removed: builder methods deleted
2857
2858    /// Set connection timeout
2859    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2860        self.config.connection_timeout = timeout;
2861        self
2862    }
2863
2864    /// Set maximum connections
2865    pub fn with_max_connections(mut self, max: usize) -> Self {
2866        self.config.max_connections = max;
2867        self
2868    }
2869
2870    /// Enable production mode with default configuration
2871    pub fn with_production_mode(mut self) -> Self {
2872        self.config.production_config = Some(ProductionConfig::default());
2873        self
2874    }
2875
2876    /// Configure production settings
2877    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2878        self.config.production_config = Some(production_config);
2879        self
2880    }
2881
2882    /// Configure IP diversity limits for Sybil protection.
2883    pub fn with_diversity_config(
2884        mut self,
2885        diversity_config: crate::security::IPDiversityConfig,
2886    ) -> Self {
2887        self.config.diversity_config = Some(diversity_config);
2888        self
2889    }
2890
2891    /// Configure DHT settings
2892    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2893        self.config.dht_config = dht_config;
2894        self
2895    }
2896
2897    /// Enable DHT with default configuration
2898    pub fn with_default_dht(mut self) -> Self {
2899        self.config.dht_config = DHTConfig::default();
2900        self
2901    }
2902
2903    /// Build the P2P node
2904    pub async fn build(self) -> Result<P2PNode> {
2905        P2PNode::new(self.config).await
2906    }
2907}
2908
2909#[cfg(test)]
2910#[allow(clippy::unwrap_used, clippy::expect_used)]
2911mod diversity_tests {
2912    use super::*;
2913    use crate::security::IPDiversityConfig;
2914
2915    async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2916        let diversity_config = config.diversity_config.clone().unwrap_or_default();
2917        // Use a temp dir to avoid conflicts with cached files from old format
2918        let temp_dir = tempfile::TempDir::new().expect("temp dir");
2919        let mut cache_config = config
2920            .bootstrap_cache_config
2921            .clone()
2922            .unwrap_or_else(crate::bootstrap::CacheConfig::default);
2923        cache_config.cache_dir = temp_dir.path().to_path_buf();
2924
2925        BootstrapManager::with_full_config(
2926            cache_config,
2927            crate::rate_limit::JoinRateLimiterConfig::default(),
2928            diversity_config,
2929        )
2930        .await
2931        .expect("bootstrap manager")
2932    }
2933
2934    #[tokio::test]
2935    async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2936        let config = NodeConfig {
2937            diversity_config: Some(IPDiversityConfig::testnet()),
2938            ..Default::default()
2939        };
2940
2941        let manager = build_bootstrap_manager_like_prod(&config).await;
2942        assert!(manager.diversity_config().is_relaxed());
2943        assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
2944    }
2945}
2946
2947/// Standalone function to handle received messages without borrowing self
2948#[allow(dead_code)] // Deprecated during ant-quic migration
2949async fn handle_received_message_standalone(
2950    message_data: Vec<u8>,
2951    peer_id: &PeerId,
2952    _protocol: &str,
2953    event_tx: &broadcast::Sender<P2PEvent>,
2954) -> Result<()> {
2955    // Parse the message format
2956    match serde_json::from_slice::<serde_json::Value>(&message_data) {
2957        Ok(message) => {
2958            if let (Some(protocol), Some(data), Some(from)) = (
2959                message.get("protocol").and_then(|v| v.as_str()),
2960                message.get("data").and_then(|v| v.as_array()),
2961                message.get("from").and_then(|v| v.as_str()),
2962            ) {
2963                // Convert data array back to bytes
2964                let data_bytes: Vec<u8> = data
2965                    .iter()
2966                    .filter_map(|v| v.as_u64().map(|n| n as u8))
2967                    .collect();
2968
2969                // Generate message event
2970                let event = P2PEvent::Message {
2971                    topic: protocol.to_string(),
2972                    source: from.to_string(),
2973                    data: data_bytes,
2974                };
2975
2976                let _ = event_tx.send(event);
2977                debug!("Generated message event from peer: {}", peer_id);
2978            }
2979        }
2980        Err(e) => {
2981            warn!("Failed to parse received message from {}: {}", peer_id, e);
2982        }
2983    }
2984
2985    Ok(())
2986}
2987
2988// MCP removed: standalone MCP handler deleted
2989
2990/// Helper function to handle protocol message creation
2991#[allow(dead_code)]
2992fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2993    match create_protocol_message_static(protocol, data) {
2994        Ok(msg) => Some(msg),
2995        Err(e) => {
2996            warn!("Failed to create protocol message: {}", e);
2997            None
2998        }
2999    }
3000}
3001
3002/// Helper function to handle message send result
3003#[allow(dead_code)]
3004async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
3005    match result {
3006        Ok(_) => {
3007            debug!("Message sent to peer {} via transport layer", peer_id);
3008        }
3009        Err(e) => {
3010            warn!("Failed to send message to peer {}: {}", peer_id, e);
3011        }
3012    }
3013}
3014
3015/// Helper function to check rate limit
3016#[allow(dead_code)] // Deprecated during ant-quic migration
3017fn check_rate_limit(
3018    rate_limiter: &RateLimiter,
3019    socket_addr: &std::net::SocketAddr,
3020    remote_addr: &NetworkAddress,
3021) -> Result<()> {
3022    rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
3023        warn!("Rate limit exceeded for {}: {}", remote_addr, e);
3024        e
3025    })
3026}
3027
3028/// Helper function to register a new peer
3029#[allow(dead_code)] // Deprecated during ant-quic migration
3030async fn register_new_peer(
3031    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
3032    peer_id: &PeerId,
3033    remote_addr: &NetworkAddress,
3034) {
3035    let mut peers_guard = peers.write().await;
3036    let peer_info = PeerInfo {
3037        peer_id: peer_id.clone(),
3038        addresses: vec![remote_addr.to_string()],
3039        connected_at: tokio::time::Instant::now(),
3040        last_seen: tokio::time::Instant::now(),
3041        status: ConnectionStatus::Connected,
3042        protocols: vec!["p2p-chat/1.0.0".to_string()],
3043        heartbeat_count: 0,
3044    };
3045    peers_guard.insert(peer_id.clone(), peer_info);
3046}
3047
3048/// Helper function to spawn connection handler
3049#[allow(dead_code)] // Deprecated during ant-quic migration
3050fn spawn_connection_handler(
3051    connection: Box<dyn crate::transport::Connection>,
3052    peer_id: PeerId,
3053    event_tx: broadcast::Sender<P2PEvent>,
3054    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
3055) {
3056    tokio::spawn(async move {
3057        handle_peer_connection(connection, peer_id, event_tx, peers).await;
3058    });
3059}
3060
3061/// Helper function to handle peer connection
3062#[allow(dead_code)] // Deprecated during ant-quic migration
3063async fn handle_peer_connection(
3064    mut connection: Box<dyn crate::transport::Connection>,
3065    peer_id: PeerId,
3066    event_tx: broadcast::Sender<P2PEvent>,
3067    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
3068) {
3069    loop {
3070        match connection.receive().await {
3071            Ok(message_data) => {
3072                debug!(
3073                    "Received {} bytes from peer: {}",
3074                    message_data.len(),
3075                    peer_id
3076                );
3077
3078                // Handle the received message
3079                if let Err(e) = handle_received_message_standalone(
3080                    message_data,
3081                    &peer_id,
3082                    "unknown", // TODO: Extract protocol from message
3083                    &event_tx,
3084                )
3085                .await
3086                {
3087                    warn!("Failed to handle message from peer {}: {}", peer_id, e);
3088                }
3089            }
3090            Err(e) => {
3091                warn!("Failed to receive message from {}: {}", peer_id, e);
3092
3093                // Check if connection is still alive
3094                if !connection.is_alive().await {
3095                    info!("Connection to {} is dead, removing peer", peer_id);
3096
3097                    // Remove dead peer
3098                    remove_peer(&peers, &peer_id).await;
3099
3100                    // Generate peer disconnected event
3101                    let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
3102
3103                    break; // Exit the message receiving loop
3104                }
3105
3106                // Brief pause before retrying
3107                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3108            }
3109        }
3110    }
3111}
3112
3113/// Helper function to remove a peer
3114#[allow(dead_code)] // Deprecated during ant-quic migration
3115async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
3116    let mut peers_guard = peers.write().await;
3117    peers_guard.remove(peer_id);
3118}
3119
3120/// Helper function to update peer heartbeat
3121#[allow(dead_code)]
3122async fn update_peer_heartbeat(
3123    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
3124    peer_id: &PeerId,
3125) -> Result<()> {
3126    let mut peers_guard = peers.write().await;
3127    match peers_guard.get_mut(peer_id) {
3128        Some(peer_info) => {
3129            peer_info.last_seen = Instant::now();
3130            peer_info.heartbeat_count += 1;
3131            Ok(())
3132        }
3133        None => {
3134            warn!("Received heartbeat from unknown peer: {}", peer_id);
3135            Err(P2PError::Network(NetworkError::PeerNotFound(
3136                format!("Peer {} not found", peer_id).into(),
3137            )))
3138        }
3139    }
3140}
3141
3142/// Helper function to get resource metrics
3143#[allow(dead_code)]
3144async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
3145    if let Some(manager) = resource_manager {
3146        let metrics = manager.get_metrics().await;
3147        (metrics.memory_used, metrics.cpu_usage)
3148    } else {
3149        (0, 0.0)
3150    }
3151}
3152
3153#[cfg(test)]
3154mod tests {
3155    use super::*;
3156    // MCP removed from tests
3157    use std::time::Duration;
3158    use tokio::time::timeout;
3159
3160    // Test tool handler for network tests
3161
3162    // MCP removed
3163
3164    /// Helper function to create a test node configuration
3165    fn create_test_node_config() -> NodeConfig {
3166        NodeConfig {
3167            peer_id: Some("test_peer_123".to_string()),
3168            listen_addrs: vec![
3169                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
3170                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
3171            ],
3172            listen_addr: std::net::SocketAddr::new(
3173                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3174                0,
3175            ),
3176            bootstrap_peers: vec![],
3177            bootstrap_peers_str: vec![],
3178            enable_ipv6: true,
3179
3180            connection_timeout: Duration::from_secs(2),
3181            keep_alive_interval: Duration::from_secs(30),
3182            max_connections: 100,
3183            max_incoming_connections: 50,
3184            dht_config: DHTConfig::default(),
3185            security_config: SecurityConfig::default(),
3186            production_config: None,
3187            bootstrap_cache_config: None,
3188            diversity_config: None,
3189            attestation_config: crate::attestation::AttestationConfig::default(),
3190        }
3191    }
3192
3193    /// Helper function to create a test tool
3194    // MCP removed: test tool helper deleted
3195
3196    #[tokio::test]
3197    async fn test_node_config_default() {
3198        let config = NodeConfig::default();
3199
3200        assert!(config.peer_id.is_none());
3201        assert_eq!(config.listen_addrs.len(), 2);
3202        assert!(config.enable_ipv6);
3203        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
3204        assert_eq!(config.max_incoming_connections, 100);
3205        assert_eq!(config.connection_timeout, Duration::from_secs(30));
3206    }
3207
3208    #[tokio::test]
3209    async fn test_dht_config_default() {
3210        let config = DHTConfig::default();
3211
3212        assert_eq!(config.k_value, 20);
3213        assert_eq!(config.alpha_value, 5);
3214        assert_eq!(config.record_ttl, Duration::from_secs(3600));
3215        assert_eq!(config.refresh_interval, Duration::from_secs(600));
3216    }
3217
3218    #[tokio::test]
3219    async fn test_security_config_default() {
3220        let config = SecurityConfig::default();
3221
3222        assert!(config.enable_noise);
3223        assert!(config.enable_tls);
3224        assert_eq!(config.trust_level, TrustLevel::Basic);
3225    }
3226
3227    #[test]
3228    fn test_trust_level_variants() {
3229        // Test that all trust level variants can be created
3230        let _none = TrustLevel::None;
3231        let _basic = TrustLevel::Basic;
3232        let _full = TrustLevel::Full;
3233
3234        // Test equality
3235        assert_eq!(TrustLevel::None, TrustLevel::None);
3236        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
3237        assert_eq!(TrustLevel::Full, TrustLevel::Full);
3238        assert_ne!(TrustLevel::None, TrustLevel::Basic);
3239    }
3240
3241    #[test]
3242    fn test_connection_status_variants() {
3243        let connecting = ConnectionStatus::Connecting;
3244        let connected = ConnectionStatus::Connected;
3245        let disconnecting = ConnectionStatus::Disconnecting;
3246        let disconnected = ConnectionStatus::Disconnected;
3247        let failed = ConnectionStatus::Failed("test error".to_string());
3248
3249        assert_eq!(connecting, ConnectionStatus::Connecting);
3250        assert_eq!(connected, ConnectionStatus::Connected);
3251        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
3252        assert_eq!(disconnected, ConnectionStatus::Disconnected);
3253        assert_ne!(connecting, connected);
3254
3255        if let ConnectionStatus::Failed(msg) = failed {
3256            assert_eq!(msg, "test error");
3257        } else {
3258            panic!("Expected Failed status");
3259        }
3260    }
3261
3262    #[tokio::test]
3263    async fn test_node_creation() -> Result<()> {
3264        let config = create_test_node_config();
3265        let node = P2PNode::new(config).await?;
3266
3267        assert_eq!(node.peer_id(), "test_peer_123");
3268        assert!(!node.is_running().await);
3269        assert_eq!(node.peer_count().await, 0);
3270        assert!(node.connected_peers().await.is_empty());
3271
3272        Ok(())
3273    }
3274
3275    #[tokio::test]
3276    async fn test_node_creation_without_peer_id() -> Result<()> {
3277        let mut config = create_test_node_config();
3278        config.peer_id = None;
3279
3280        let node = P2PNode::new(config).await?;
3281
3282        // Should have generated a peer ID
3283        assert!(node.peer_id().starts_with("peer_"));
3284        assert!(!node.is_running().await);
3285
3286        Ok(())
3287    }
3288
3289    #[tokio::test]
3290    async fn test_node_lifecycle() -> Result<()> {
3291        let config = create_test_node_config();
3292        let node = P2PNode::new(config).await?;
3293
3294        // Initially not running
3295        assert!(!node.is_running().await);
3296
3297        // Start the node
3298        node.start().await?;
3299        assert!(node.is_running().await);
3300
3301        // Check listen addresses were set (at least one)
3302        let listen_addrs = node.listen_addrs().await;
3303        assert!(
3304            !listen_addrs.is_empty(),
3305            "Expected at least one listening address"
3306        );
3307
3308        // Stop the node
3309        node.stop().await?;
3310        assert!(!node.is_running().await);
3311
3312        Ok(())
3313    }
3314
3315    #[tokio::test]
3316    async fn test_peer_connection() -> Result<()> {
3317        let config1 = create_test_node_config();
3318        let mut config2 = create_test_node_config();
3319        config2.peer_id = Some("test_peer_456".to_string());
3320
3321        let node1 = P2PNode::new(config1).await?;
3322        let node2 = P2PNode::new(config2).await?;
3323
3324        node1.start().await?;
3325        node2.start().await?;
3326
3327        let node2_addr = node2
3328            .listen_addrs()
3329            .await
3330            .into_iter()
3331            .find(|a| a.ip().is_ipv4())
3332            .ok_or_else(|| {
3333                P2PError::Network(crate::error::NetworkError::InvalidAddress(
3334                    "Node 2 did not expose an IPv4 listen address".into(),
3335                ))
3336            })?;
3337
3338        // Connect to a real peer
3339        let peer_id = node1.connect_peer(&node2_addr.to_string()).await?;
3340
3341        // Check peer count
3342        assert_eq!(node1.peer_count().await, 1);
3343
3344        // Check connected peers
3345        let connected_peers = node1.connected_peers().await;
3346        assert_eq!(connected_peers.len(), 1);
3347        assert_eq!(connected_peers[0], peer_id);
3348
3349        // Get peer info
3350        let peer_info = node1.peer_info(&peer_id).await;
3351        assert!(peer_info.is_some());
3352        let info = peer_info.expect("Peer info should exist after adding peer");
3353        assert_eq!(info.peer_id, peer_id);
3354        assert_eq!(info.status, ConnectionStatus::Connected);
3355        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3356
3357        // Disconnect from peer
3358        node1.disconnect_peer(&peer_id).await?;
3359        assert_eq!(node1.peer_count().await, 0);
3360
3361        node1.stop().await?;
3362        node2.stop().await?;
3363
3364        Ok(())
3365    }
3366
3367    // TODO(windows): Investigate QUIC connection issues on Windows CI
3368    // This test consistently fails on Windows GitHub Actions runners with
3369    // "All connect attempts failed" even with IPv4-only config, long delays,
3370    // and multiple retry attempts. The underlying ant-quic library may have
3371    // issues on Windows that need investigation.
3372    // See: https://github.com/dirvine/saorsa-core/issues/TBD
3373    #[cfg_attr(target_os = "windows", ignore)]
3374    #[tokio::test]
3375    async fn test_event_subscription() -> Result<()> {
3376        // Configure both nodes to use only IPv4 for reliable cross-platform testing
3377        // This is important because:
3378        // 1. local_addr() returns the first address from listen_addrs
3379        // 2. The default config puts IPv6 first, which may not work on all Windows setups
3380        let ipv4_localhost =
3381            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3382
3383        let mut config1 = create_test_node_config();
3384        config1.listen_addr = ipv4_localhost;
3385        config1.listen_addrs = vec![ipv4_localhost];
3386        config1.enable_ipv6 = false;
3387
3388        let mut config2 = create_test_node_config();
3389        config2.peer_id = Some("test_peer_456".to_string());
3390        config2.listen_addr = ipv4_localhost;
3391        config2.listen_addrs = vec![ipv4_localhost];
3392        config2.enable_ipv6 = false;
3393
3394        let node1 = P2PNode::new(config1).await?;
3395        let node2 = P2PNode::new(config2).await?;
3396
3397        node1.start().await?;
3398        node2.start().await?;
3399
3400        // Wait for nodes to fully bind their listening sockets
3401        // Windows network stack initialization can be significantly slower
3402        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
3403
3404        let mut events = node1.subscribe_events();
3405
3406        // Get the actual listening address using local_addr() for reliability
3407        let node2_addr = node2.local_addr().ok_or_else(|| {
3408            P2PError::Network(crate::error::NetworkError::ProtocolError(
3409                "No listening address".to_string().into(),
3410            ))
3411        })?;
3412
3413        // Connect to a peer with retry logic for Windows reliability
3414        // The QUIC library may need additional time to fully initialize
3415        let mut peer_id = None;
3416        for attempt in 0..3 {
3417            if attempt > 0 {
3418                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3419            }
3420            match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
3421                Ok(Ok(id)) => {
3422                    peer_id = Some(id);
3423                    break;
3424                }
3425                Ok(Err(_)) | Err(_) => continue,
3426            }
3427        }
3428        let peer_id = peer_id.ok_or_else(|| {
3429            P2PError::Network(crate::error::NetworkError::ProtocolError(
3430                "Failed to connect after 3 attempts".to_string().into(),
3431            ))
3432        })?;
3433
3434        // Check for PeerConnected event
3435        let event = timeout(Duration::from_secs(2), events.recv()).await;
3436        assert!(event.is_ok());
3437
3438        let event_result = event
3439            .expect("Should receive event")
3440            .expect("Event should not be error");
3441        match event_result {
3442            P2PEvent::PeerConnected(event_peer_id) => {
3443                assert_eq!(event_peer_id, peer_id);
3444            }
3445            _ => panic!("Expected PeerConnected event"),
3446        }
3447
3448        // Disconnect from peer (this should emit another event)
3449        node1.disconnect_peer(&peer_id).await?;
3450
3451        // Check for PeerDisconnected event
3452        let event = timeout(Duration::from_secs(2), events.recv()).await;
3453        assert!(event.is_ok());
3454
3455        let event_result = event
3456            .expect("Should receive event")
3457            .expect("Event should not be error");
3458        match event_result {
3459            P2PEvent::PeerDisconnected(event_peer_id) => {
3460                assert_eq!(event_peer_id, peer_id);
3461            }
3462            _ => panic!("Expected PeerDisconnected event"),
3463        }
3464
3465        node1.stop().await?;
3466        node2.stop().await?;
3467
3468        Ok(())
3469    }
3470
3471    // TODO(windows): Same QUIC connection issues as test_event_subscription
3472    #[cfg_attr(target_os = "windows", ignore)]
3473    #[tokio::test]
3474    async fn test_message_sending() -> Result<()> {
3475        // Create two nodes
3476        let mut config1 = create_test_node_config();
3477        config1.listen_addr =
3478            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3479        let node1 = P2PNode::new(config1).await?;
3480        node1.start().await?;
3481
3482        let mut config2 = create_test_node_config();
3483        config2.listen_addr =
3484            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3485        let node2 = P2PNode::new(config2).await?;
3486        node2.start().await?;
3487
3488        // Wait a bit for nodes to start listening
3489        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
3490
3491        // Get actual listening address of node2
3492        let node2_addr = node2.local_addr().ok_or_else(|| {
3493            P2PError::Network(crate::error::NetworkError::ProtocolError(
3494                "No listening address".to_string().into(),
3495            ))
3496        })?;
3497
3498        // Connect node1 to node2
3499        let peer_id =
3500            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
3501                Ok(res) => res?,
3502                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3503            };
3504
3505        // Wait a bit for connection to establish
3506        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
3507
3508        // Send a message
3509        let message_data = b"Hello, peer!".to_vec();
3510        let result = match timeout(
3511            Duration::from_millis(500),
3512            node1.send_message(&peer_id, "test-protocol", message_data),
3513        )
3514        .await
3515        {
3516            Ok(res) => res,
3517            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3518        };
3519        // For now, we'll just check that we don't get a "not connected" error
3520        // The actual send might fail due to no handler on the other side
3521        if let Err(e) = &result {
3522            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3523        }
3524
3525        // Try to send to non-existent peer
3526        let non_existent_peer = "non_existent_peer".to_string();
3527        let result = node1
3528            .send_message(&non_existent_peer, "test-protocol", vec![])
3529            .await;
3530        assert!(result.is_err(), "Sending to non-existent peer should fail");
3531
3532        Ok(())
3533    }
3534
3535    #[tokio::test]
3536    async fn test_remote_mcp_operations() -> Result<()> {
3537        let config = create_test_node_config();
3538        let node = P2PNode::new(config).await?;
3539
3540        // MCP removed; test reduced to simple start/stop
3541        node.start().await?;
3542        node.stop().await?;
3543        Ok(())
3544    }
3545
3546    #[tokio::test]
3547    async fn test_health_check() -> Result<()> {
3548        let config = create_test_node_config();
3549        let node = P2PNode::new(config).await?;
3550
3551        // Health check should pass with no connections
3552        let result = node.health_check().await;
3553        assert!(result.is_ok());
3554
3555        // Note: We're not actually connecting to real peers here
3556        // since that would require running bootstrap nodes.
3557        // The health check should still pass with no connections.
3558
3559        Ok(())
3560    }
3561
3562    #[tokio::test]
3563    async fn test_node_uptime() -> Result<()> {
3564        let config = create_test_node_config();
3565        let node = P2PNode::new(config).await?;
3566
3567        let uptime1 = node.uptime();
3568        assert!(uptime1 >= Duration::from_secs(0));
3569
3570        // Wait a bit
3571        tokio::time::sleep(Duration::from_millis(10)).await;
3572
3573        let uptime2 = node.uptime();
3574        assert!(uptime2 > uptime1);
3575
3576        Ok(())
3577    }
3578
3579    #[tokio::test]
3580    async fn test_node_config_access() -> Result<()> {
3581        let config = create_test_node_config();
3582        let expected_peer_id = config.peer_id.clone();
3583        let node = P2PNode::new(config).await?;
3584
3585        let node_config = node.config();
3586        assert_eq!(node_config.peer_id, expected_peer_id);
3587        assert_eq!(node_config.max_connections, 100);
3588        // MCP removed
3589
3590        Ok(())
3591    }
3592
3593    #[tokio::test]
3594    async fn test_mcp_server_access() -> Result<()> {
3595        let config = create_test_node_config();
3596        let _node = P2PNode::new(config).await?;
3597
3598        // MCP removed
3599        Ok(())
3600    }
3601
3602    #[tokio::test]
3603    async fn test_dht_access() -> Result<()> {
3604        let config = create_test_node_config();
3605        let node = P2PNode::new(config).await?;
3606
3607        // Should have DHT
3608        assert!(node.dht().is_some());
3609
3610        Ok(())
3611    }
3612
3613    #[tokio::test]
3614    async fn test_node_builder() -> Result<()> {
3615        // Create a config using the builder but don't actually build a real node
3616        let builder = P2PNode::builder()
3617            .with_peer_id("builder_test_peer".to_string())
3618            .listen_on("/ip4/127.0.0.1/tcp/0")
3619            .listen_on("/ip6/::1/tcp/0")
3620            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
3621            .with_ipv6(true)
3622            .with_connection_timeout(Duration::from_secs(15))
3623            .with_max_connections(200);
3624
3625        // Test the configuration that was built
3626        let config = builder.config;
3627        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3628        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
3629        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
3630        assert!(config.enable_ipv6);
3631        assert_eq!(config.connection_timeout, Duration::from_secs(15));
3632        assert_eq!(config.max_connections, 200);
3633
3634        Ok(())
3635    }
3636
3637    #[tokio::test]
3638    async fn test_bootstrap_peers() -> Result<()> {
3639        let mut config = create_test_node_config();
3640        config.bootstrap_peers = vec![
3641            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3642            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3643        ];
3644
3645        let node = P2PNode::new(config).await?;
3646
3647        // Start node (which attempts to connect to bootstrap peers)
3648        node.start().await?;
3649
3650        // In a test environment, bootstrap peers may not be available
3651        // The test verifies the node starts correctly with bootstrap configuration
3652        // Peer count may include local/internal tracking, so we just verify it's reasonable
3653        let _peer_count = node.peer_count().await;
3654
3655        node.stop().await?;
3656        Ok(())
3657    }
3658
3659    #[tokio::test]
3660    async fn test_production_mode_disabled() -> Result<()> {
3661        let config = create_test_node_config();
3662        let node = P2PNode::new(config).await?;
3663
3664        assert!(!node.is_production_mode());
3665        assert!(node.production_config().is_none());
3666
3667        // Resource metrics should fail when production mode is disabled
3668        let result = node.resource_metrics().await;
3669        assert!(result.is_err());
3670        assert!(result.unwrap_err().to_string().contains("not enabled"));
3671
3672        Ok(())
3673    }
3674
3675    #[tokio::test]
3676    async fn test_network_event_variants() {
3677        // Test that all network event variants can be created
3678        let peer_id = "test_peer".to_string();
3679        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3680
3681        let _peer_connected = NetworkEvent::PeerConnected {
3682            peer_id: peer_id.clone(),
3683            addresses: vec![address.clone()],
3684        };
3685
3686        let _peer_disconnected = NetworkEvent::PeerDisconnected {
3687            peer_id: peer_id.clone(),
3688            reason: "test disconnect".to_string(),
3689        };
3690
3691        let _message_received = NetworkEvent::MessageReceived {
3692            peer_id: peer_id.clone(),
3693            protocol: "test-protocol".to_string(),
3694            data: vec![1, 2, 3],
3695        };
3696
3697        let _connection_failed = NetworkEvent::ConnectionFailed {
3698            peer_id: Some(peer_id.clone()),
3699            address: address.clone(),
3700            error: "connection refused".to_string(),
3701        };
3702
3703        let _dht_stored = NetworkEvent::DHTRecordStored {
3704            key: vec![1, 2, 3],
3705            value: vec![4, 5, 6],
3706        };
3707
3708        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3709            key: vec![1, 2, 3],
3710            value: Some(vec![4, 5, 6]),
3711        };
3712    }
3713
3714    #[tokio::test]
3715    async fn test_peer_info_structure() {
3716        let peer_info = PeerInfo {
3717            peer_id: "test_peer".to_string(),
3718            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3719            connected_at: Instant::now(),
3720            last_seen: Instant::now(),
3721            status: ConnectionStatus::Connected,
3722            protocols: vec!["test-protocol".to_string()],
3723            heartbeat_count: 0,
3724        };
3725
3726        assert_eq!(peer_info.peer_id, "test_peer");
3727        assert_eq!(peer_info.addresses.len(), 1);
3728        assert_eq!(peer_info.status, ConnectionStatus::Connected);
3729        assert_eq!(peer_info.protocols.len(), 1);
3730    }
3731
3732    #[tokio::test]
3733    async fn test_serialization() -> Result<()> {
3734        // Test that configs can be serialized/deserialized
3735        let config = create_test_node_config();
3736        let serialized = serde_json::to_string(&config)?;
3737        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3738
3739        assert_eq!(config.peer_id, deserialized.peer_id);
3740        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3741        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3742
3743        Ok(())
3744    }
3745
3746    #[tokio::test]
3747    async fn test_get_peer_id_by_address_found() -> Result<()> {
3748        let config = create_test_node_config();
3749        let node = P2PNode::new(config).await?;
3750
3751        // Manually insert a peer for testing
3752        let test_peer_id = "peer_test_123".to_string();
3753        let test_address = "192.168.1.100:9000".to_string();
3754
3755        let peer_info = PeerInfo {
3756            peer_id: test_peer_id.clone(),
3757            addresses: vec![test_address.clone()],
3758            connected_at: Instant::now(),
3759            last_seen: Instant::now(),
3760            status: ConnectionStatus::Connected,
3761            protocols: vec!["test-protocol".to_string()],
3762            heartbeat_count: 0,
3763        };
3764
3765        node.peers
3766            .write()
3767            .await
3768            .insert(test_peer_id.clone(), peer_info);
3769
3770        // Test: Find peer by address
3771        let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3772        assert_eq!(found_peer_id, Some(test_peer_id));
3773
3774        Ok(())
3775    }
3776
3777    #[tokio::test]
3778    async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3779        let config = create_test_node_config();
3780        let node = P2PNode::new(config).await?;
3781
3782        // Test: Try to find a peer that doesn't exist
3783        let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3784        assert_eq!(result, None);
3785
3786        Ok(())
3787    }
3788
3789    #[tokio::test]
3790    async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3791        let config = create_test_node_config();
3792        let node = P2PNode::new(config).await?;
3793
3794        // Test: Invalid address format should return None
3795        let result = node.get_peer_id_by_address("invalid-address").await;
3796        assert_eq!(result, None);
3797
3798        Ok(())
3799    }
3800
3801    #[tokio::test]
3802    async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3803        let config = create_test_node_config();
3804        let node = P2PNode::new(config).await?;
3805
3806        // Add multiple peers with different addresses
3807        let peer1_id = "peer_1".to_string();
3808        let peer1_addr = "192.168.1.101:9001".to_string();
3809
3810        let peer2_id = "peer_2".to_string();
3811        let peer2_addr = "192.168.1.102:9002".to_string();
3812
3813        let peer1_info = PeerInfo {
3814            peer_id: peer1_id.clone(),
3815            addresses: vec![peer1_addr.clone()],
3816            connected_at: Instant::now(),
3817            last_seen: Instant::now(),
3818            status: ConnectionStatus::Connected,
3819            protocols: vec!["test-protocol".to_string()],
3820            heartbeat_count: 0,
3821        };
3822
3823        let peer2_info = PeerInfo {
3824            peer_id: peer2_id.clone(),
3825            addresses: vec![peer2_addr.clone()],
3826            connected_at: Instant::now(),
3827            last_seen: Instant::now(),
3828            status: ConnectionStatus::Connected,
3829            protocols: vec!["test-protocol".to_string()],
3830            heartbeat_count: 0,
3831        };
3832
3833        node.peers
3834            .write()
3835            .await
3836            .insert(peer1_id.clone(), peer1_info);
3837        node.peers
3838            .write()
3839            .await
3840            .insert(peer2_id.clone(), peer2_info);
3841
3842        // Test: Find each peer by their unique address
3843        let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3844        let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3845
3846        assert_eq!(found_peer1, Some(peer1_id));
3847        assert_eq!(found_peer2, Some(peer2_id));
3848
3849        Ok(())
3850    }
3851
3852    #[tokio::test]
3853    async fn test_list_active_connections_empty() -> Result<()> {
3854        let config = create_test_node_config();
3855        let node = P2PNode::new(config).await?;
3856
3857        // Test: No connections initially
3858        let connections = node.list_active_connections().await;
3859        assert!(connections.is_empty());
3860
3861        Ok(())
3862    }
3863
3864    #[tokio::test]
3865    async fn test_list_active_connections_with_peers() -> Result<()> {
3866        let config = create_test_node_config();
3867        let node = P2PNode::new(config).await?;
3868
3869        // Add multiple peers
3870        let peer1_id = "peer_1".to_string();
3871        let peer1_addrs = vec![
3872            "192.168.1.101:9001".to_string(),
3873            "192.168.1.101:9002".to_string(),
3874        ];
3875
3876        let peer2_id = "peer_2".to_string();
3877        let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3878
3879        let peer1_info = PeerInfo {
3880            peer_id: peer1_id.clone(),
3881            addresses: peer1_addrs.clone(),
3882            connected_at: Instant::now(),
3883            last_seen: Instant::now(),
3884            status: ConnectionStatus::Connected,
3885            protocols: vec!["test-protocol".to_string()],
3886            heartbeat_count: 0,
3887        };
3888
3889        let peer2_info = PeerInfo {
3890            peer_id: peer2_id.clone(),
3891            addresses: peer2_addrs.clone(),
3892            connected_at: Instant::now(),
3893            last_seen: Instant::now(),
3894            status: ConnectionStatus::Connected,
3895            protocols: vec!["test-protocol".to_string()],
3896            heartbeat_count: 0,
3897        };
3898
3899        node.peers
3900            .write()
3901            .await
3902            .insert(peer1_id.clone(), peer1_info);
3903        node.peers
3904            .write()
3905            .await
3906            .insert(peer2_id.clone(), peer2_info);
3907
3908        // Also add to active_connections (list_active_connections iterates over this)
3909        node.active_connections
3910            .write()
3911            .await
3912            .insert(peer1_id.clone());
3913        node.active_connections
3914            .write()
3915            .await
3916            .insert(peer2_id.clone());
3917
3918        // Test: List all active connections
3919        let connections = node.list_active_connections().await;
3920        assert_eq!(connections.len(), 2);
3921
3922        // Verify peer1 and peer2 are in the list
3923        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3924        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3925
3926        assert!(peer1_conn.is_some());
3927        assert!(peer2_conn.is_some());
3928
3929        // Verify addresses match
3930        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3931        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3932
3933        Ok(())
3934    }
3935
3936    #[tokio::test]
3937    async fn test_remove_peer_success() -> Result<()> {
3938        let config = create_test_node_config();
3939        let node = P2PNode::new(config).await?;
3940
3941        // Add a peer
3942        let peer_id = "peer_to_remove".to_string();
3943        let peer_info = PeerInfo {
3944            peer_id: peer_id.clone(),
3945            addresses: vec!["192.168.1.100:9000".to_string()],
3946            connected_at: Instant::now(),
3947            last_seen: Instant::now(),
3948            status: ConnectionStatus::Connected,
3949            protocols: vec!["test-protocol".to_string()],
3950            heartbeat_count: 0,
3951        };
3952
3953        node.peers.write().await.insert(peer_id.clone(), peer_info);
3954
3955        // Verify peer exists
3956        assert!(node.is_peer_connected(&peer_id).await);
3957
3958        // Remove the peer
3959        let removed = node.remove_peer(&peer_id).await;
3960        assert!(removed);
3961
3962        // Verify peer no longer exists
3963        assert!(!node.is_peer_connected(&peer_id).await);
3964
3965        Ok(())
3966    }
3967
3968    #[tokio::test]
3969    async fn test_remove_peer_nonexistent() -> Result<()> {
3970        let config = create_test_node_config();
3971        let node = P2PNode::new(config).await?;
3972
3973        // Try to remove a peer that doesn't exist
3974        let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3975        assert!(!removed);
3976
3977        Ok(())
3978    }
3979
3980    #[tokio::test]
3981    async fn test_is_peer_connected() -> Result<()> {
3982        let config = create_test_node_config();
3983        let node = P2PNode::new(config).await?;
3984
3985        let peer_id = "test_peer".to_string();
3986
3987        // Initially not connected
3988        assert!(!node.is_peer_connected(&peer_id).await);
3989
3990        // Add peer
3991        let peer_info = PeerInfo {
3992            peer_id: peer_id.clone(),
3993            addresses: vec!["192.168.1.100:9000".to_string()],
3994            connected_at: Instant::now(),
3995            last_seen: Instant::now(),
3996            status: ConnectionStatus::Connected,
3997            protocols: vec!["test-protocol".to_string()],
3998            heartbeat_count: 0,
3999        };
4000
4001        node.peers.write().await.insert(peer_id.clone(), peer_info);
4002
4003        // Now connected
4004        assert!(node.is_peer_connected(&peer_id).await);
4005
4006        // Remove peer
4007        node.remove_peer(&peer_id).await;
4008
4009        // No longer connected
4010        assert!(!node.is_peer_connected(&peer_id).await);
4011
4012        Ok(())
4013    }
4014
4015    #[test]
4016    fn test_normalize_ipv6_wildcard() {
4017        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
4018
4019        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
4020        let normalized = normalize_wildcard_to_loopback(wildcard);
4021
4022        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
4023        assert_eq!(normalized.port(), 8080);
4024    }
4025
4026    #[test]
4027    fn test_normalize_ipv4_wildcard() {
4028        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
4029
4030        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
4031        let normalized = normalize_wildcard_to_loopback(wildcard);
4032
4033        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
4034        assert_eq!(normalized.port(), 9000);
4035    }
4036
4037    #[test]
4038    fn test_normalize_specific_address_unchanged() {
4039        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
4040        let normalized = normalize_wildcard_to_loopback(specific);
4041
4042        assert_eq!(normalized, specific);
4043    }
4044
4045    #[test]
4046    fn test_normalize_loopback_unchanged() {
4047        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
4048
4049        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
4050        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
4051        assert_eq!(normalized_v6, loopback_v6);
4052
4053        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
4054        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
4055        assert_eq!(normalized_v4, loopback_v4);
4056    }
4057}