Skip to main content

saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::dht::DHT;
23use crate::error::{NetworkError, P2PError, P2pResult as Result};
24use crate::security::GeoProvider;
25
26use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
27use crate::transport::ant_quic_adapter::{DualStackNetworkNode, ant_peer_id_to_string};
28#[allow(unused_imports)] // Temporarily unused during migration
29use crate::transport::{TransportOptions, TransportType};
30use crate::validation::RateLimitConfig;
31use crate::validation::RateLimiter;
32use crate::{NetworkAddress, PeerId};
33use serde::{Deserialize, Serialize};
34use std::collections::{HashMap, HashSet};
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::time::Duration;
38use tokio::sync::{RwLock, broadcast};
39use tokio::time::{Instant, interval};
40use tracing::{debug, error, info, trace, warn};
41
42/// Wire protocol message format for P2P communication.
43///
44/// Serialized with bincode for compact binary encoding.
45/// Replaces the previous JSON format for better performance
46/// and smaller wire size.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48struct WireMessage {
49    /// Protocol/topic identifier
50    protocol: String,
51    /// Raw payload bytes
52    data: Vec<u8>,
53    /// Sender's peer ID (verified against transport-level identity)
54    from: String,
55    /// Unix timestamp in seconds
56    timestamp: u64,
57}
58
59/// Payload bytes used for keepalive messages to prevent connection timeouts.
60const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive";
61
62/// Channel capacity for the per-stack recv task mpsc channel.
63const RECV_CHANNEL_CAPACITY: usize = 256;
64
65/// Configuration for a P2P node
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct NodeConfig {
68    /// Local peer ID for this node
69    pub peer_id: Option<PeerId>,
70
71    /// Addresses to listen on for incoming connections
72    pub listen_addrs: Vec<std::net::SocketAddr>,
73
74    /// Primary listen address (for compatibility)
75    pub listen_addr: std::net::SocketAddr,
76
77    /// Bootstrap peers to connect to on startup (legacy)
78    pub bootstrap_peers: Vec<std::net::SocketAddr>,
79
80    /// Bootstrap peers as strings (for integration tests)
81    pub bootstrap_peers_str: Vec<String>,
82
83    /// Enable IPv6 support
84    pub enable_ipv6: bool,
85
86    // MCP removed; will be redesigned later
87    /// Connection timeout duration
88    pub connection_timeout: Duration,
89
90    /// Keep-alive interval for connections
91    pub keep_alive_interval: Duration,
92
93    /// Maximum number of concurrent connections
94    pub max_connections: usize,
95
96    /// Maximum number of incoming connections
97    pub max_incoming_connections: usize,
98
99    /// DHT configuration
100    pub dht_config: DHTConfig,
101
102    /// Security configuration
103    pub security_config: SecurityConfig,
104
105    /// Production hardening configuration
106    pub production_config: Option<ProductionConfig>,
107
108    /// Bootstrap cache configuration
109    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
110
111    /// Optional IP diversity configuration for Sybil protection tuning.
112    ///
113    /// When set, this configuration is used by bootstrap peer discovery and
114    /// other diversity-enforcing subsystems. If `None`, defaults are used.
115    pub diversity_config: Option<crate::security::IPDiversityConfig>,
116
117    /// Attestation configuration for software integrity verification.
118    ///
119    /// Controls how nodes verify each other's software attestation during handshake.
120    /// In Phase 1, this is used for "soft enforcement" (logging only).
121    #[serde(default)]
122    pub attestation_config: crate::attestation::AttestationConfig,
123
124    /// Stale peer threshold - peers with no activity for this duration are considered stale.
125    /// Defaults to 60 seconds. Can be reduced for testing purposes.
126    #[serde(default = "default_stale_peer_threshold")]
127    pub stale_peer_threshold: Duration,
128}
129
130/// Default stale peer threshold (60 seconds)
131fn default_stale_peer_threshold() -> Duration {
132    Duration::from_secs(60)
133}
134
135/// DHT-specific configuration
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct DHTConfig {
138    /// Kademlia K parameter (bucket size)
139    pub k_value: usize,
140
141    /// Kademlia alpha parameter (parallelism)
142    pub alpha_value: usize,
143
144    /// DHT record TTL
145    pub record_ttl: Duration,
146
147    /// DHT refresh interval
148    pub refresh_interval: Duration,
149}
150
151/// Security configuration
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct SecurityConfig {
154    /// Enable noise protocol for encryption
155    pub enable_noise: bool,
156
157    /// Enable TLS for secure transport
158    pub enable_tls: bool,
159
160    /// Trust level for peer verification
161    pub trust_level: TrustLevel,
162}
163
164/// Trust level for peer verification
165#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
166pub enum TrustLevel {
167    /// No verification required
168    None,
169    /// Basic peer ID verification
170    Basic,
171    /// Full cryptographic verification
172    Full,
173}
174
175// ============================================================================
176// Address Construction Helpers
177// ============================================================================
178
179/// Build listen addresses based on port and IPv6 preference
180///
181/// This helper consolidates the duplicated address construction logic.
182#[inline]
183fn build_listen_addrs(port: u16, ipv6_enabled: bool) -> Vec<std::net::SocketAddr> {
184    let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
185
186    if ipv6_enabled {
187        addrs.push(std::net::SocketAddr::new(
188            std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
189            port,
190        ));
191    }
192
193    addrs.push(std::net::SocketAddr::new(
194        std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
195        port,
196    ));
197
198    addrs
199}
200
201impl NodeConfig {
202    /// Create a new NodeConfig with default values
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if default addresses cannot be parsed
207    pub fn new() -> Result<Self> {
208        let config = Config::default();
209        let listen_addr = config.listen_socket_addr()?;
210
211        Ok(Self {
212            peer_id: None,
213            listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
214            listen_addr,
215            bootstrap_peers: Vec::new(),
216            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
217            enable_ipv6: config.network.ipv6_enabled,
218            connection_timeout: Duration::from_secs(config.network.connection_timeout),
219            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
220            max_connections: config.network.max_connections,
221            max_incoming_connections: config.security.connection_limit as usize,
222            dht_config: DHTConfig::default(),
223            security_config: SecurityConfig::default(),
224            production_config: None,
225            bootstrap_cache_config: None,
226            diversity_config: None,
227            attestation_config: config.attestation.clone(),
228            stale_peer_threshold: default_stale_peer_threshold(),
229        })
230    }
231
232    /// Create a builder for customized NodeConfig construction
233    pub fn builder() -> NodeConfigBuilder {
234        NodeConfigBuilder::default()
235    }
236}
237
238// ============================================================================
239// NodeConfig Builder Pattern
240// ============================================================================
241
242/// Builder for constructing NodeConfig with fluent API
243#[derive(Debug, Clone, Default)]
244pub struct NodeConfigBuilder {
245    peer_id: Option<PeerId>,
246    listen_port: Option<u16>,
247    enable_ipv6: Option<bool>,
248    bootstrap_peers: Vec<std::net::SocketAddr>,
249    max_connections: Option<usize>,
250    connection_timeout: Option<Duration>,
251    keep_alive_interval: Option<Duration>,
252    dht_config: Option<DHTConfig>,
253    security_config: Option<SecurityConfig>,
254    production_config: Option<ProductionConfig>,
255}
256
257impl NodeConfigBuilder {
258    /// Set the peer ID
259    pub fn peer_id(mut self, peer_id: PeerId) -> Self {
260        self.peer_id = Some(peer_id);
261        self
262    }
263
264    /// Set the listen port
265    pub fn listen_port(mut self, port: u16) -> Self {
266        self.listen_port = Some(port);
267        self
268    }
269
270    /// Enable or disable IPv6
271    pub fn ipv6(mut self, enabled: bool) -> Self {
272        self.enable_ipv6 = Some(enabled);
273        self
274    }
275
276    /// Add a bootstrap peer
277    pub fn bootstrap_peer(mut self, addr: std::net::SocketAddr) -> Self {
278        self.bootstrap_peers.push(addr);
279        self
280    }
281
282    /// Set maximum connections
283    pub fn max_connections(mut self, max: usize) -> Self {
284        self.max_connections = Some(max);
285        self
286    }
287
288    /// Set connection timeout
289    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
290        self.connection_timeout = Some(timeout);
291        self
292    }
293
294    /// Set keep-alive interval
295    pub fn keep_alive_interval(mut self, interval: Duration) -> Self {
296        self.keep_alive_interval = Some(interval);
297        self
298    }
299
300    /// Set DHT configuration
301    pub fn dht_config(mut self, config: DHTConfig) -> Self {
302        self.dht_config = Some(config);
303        self
304    }
305
306    /// Set security configuration
307    pub fn security_config(mut self, config: SecurityConfig) -> Self {
308        self.security_config = Some(config);
309        self
310    }
311
312    /// Set production configuration
313    pub fn production_config(mut self, config: ProductionConfig) -> Self {
314        self.production_config = Some(config);
315        self
316    }
317
318    /// Build the NodeConfig
319    ///
320    /// # Errors
321    ///
322    /// Returns an error if address construction fails
323    pub fn build(self) -> Result<NodeConfig> {
324        let base_config = Config::default();
325        let default_port = base_config
326            .listen_socket_addr()
327            .map(|addr| addr.port())
328            .unwrap_or(9000);
329        let port = self.listen_port.unwrap_or(default_port);
330        let ipv6_enabled = self.enable_ipv6.unwrap_or(base_config.network.ipv6_enabled);
331
332        let listen_addr =
333            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), port);
334
335        Ok(NodeConfig {
336            peer_id: self.peer_id,
337            listen_addrs: build_listen_addrs(port, ipv6_enabled),
338            listen_addr,
339            bootstrap_peers: self.bootstrap_peers.clone(),
340            bootstrap_peers_str: self.bootstrap_peers.iter().map(|a| a.to_string()).collect(),
341            enable_ipv6: ipv6_enabled,
342            connection_timeout: self
343                .connection_timeout
344                .unwrap_or(Duration::from_secs(base_config.network.connection_timeout)),
345            keep_alive_interval: self
346                .keep_alive_interval
347                .unwrap_or(Duration::from_secs(base_config.network.keepalive_interval)),
348            max_connections: self
349                .max_connections
350                .unwrap_or(base_config.network.max_connections),
351            max_incoming_connections: base_config.security.connection_limit as usize,
352            dht_config: self.dht_config.unwrap_or_default(),
353            security_config: self.security_config.unwrap_or_default(),
354            production_config: self.production_config,
355            bootstrap_cache_config: None,
356            diversity_config: None,
357            attestation_config: base_config.attestation.clone(),
358            stale_peer_threshold: default_stale_peer_threshold(),
359        })
360    }
361}
362
363impl Default for NodeConfig {
364    fn default() -> Self {
365        let config = Config::default();
366        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
367            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 9000)
368        });
369
370        Self {
371            peer_id: None,
372            listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
373            listen_addr,
374            bootstrap_peers: Vec::new(),
375            bootstrap_peers_str: Vec::new(),
376            enable_ipv6: config.network.ipv6_enabled,
377            connection_timeout: Duration::from_secs(config.network.connection_timeout),
378            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
379            max_connections: config.network.max_connections,
380            max_incoming_connections: config.security.connection_limit as usize,
381            dht_config: DHTConfig::default(),
382            security_config: SecurityConfig::default(),
383            production_config: None,
384            bootstrap_cache_config: None,
385            diversity_config: None,
386            attestation_config: config.attestation.clone(),
387            stale_peer_threshold: default_stale_peer_threshold(),
388        }
389    }
390}
391
392impl NodeConfig {
393    /// Create NodeConfig from Config
394    pub fn from_config(config: &Config) -> Result<Self> {
395        let listen_addr = config.listen_socket_addr()?;
396        let bootstrap_addrs = config.bootstrap_addrs()?;
397
398        let mut node_config = Self {
399            peer_id: None,
400            listen_addrs: vec![listen_addr],
401            listen_addr,
402            bootstrap_peers: bootstrap_addrs
403                .iter()
404                .map(|addr| addr.socket_addr())
405                .collect(),
406            bootstrap_peers_str: config
407                .network
408                .bootstrap_nodes
409                .iter()
410                .map(|addr| addr.to_string())
411                .collect(),
412            enable_ipv6: config.network.ipv6_enabled,
413
414            connection_timeout: Duration::from_secs(config.network.connection_timeout),
415            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
416            max_connections: config.network.max_connections,
417            max_incoming_connections: config.security.connection_limit as usize,
418            dht_config: DHTConfig {
419                k_value: 20,
420                alpha_value: 3,
421                record_ttl: Duration::from_secs(3600),
422                refresh_interval: Duration::from_secs(900),
423            },
424            security_config: SecurityConfig {
425                enable_noise: true,
426                enable_tls: true,
427                trust_level: TrustLevel::Basic,
428            },
429            production_config: Some(ProductionConfig {
430                max_connections: config.network.max_connections,
431                max_memory_bytes: 0,  // unlimited
432                max_bandwidth_bps: 0, // unlimited
433                connection_timeout: Duration::from_secs(config.network.connection_timeout),
434                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
435                health_check_interval: Duration::from_secs(30),
436                metrics_interval: Duration::from_secs(60),
437                enable_performance_tracking: true,
438                enable_auto_cleanup: true,
439                shutdown_timeout: Duration::from_secs(30),
440                rate_limits: crate::production::RateLimitConfig::default(),
441            }),
442            bootstrap_cache_config: None,
443            diversity_config: None,
444            attestation_config: config.attestation.clone(),
445            stale_peer_threshold: default_stale_peer_threshold(),
446        };
447
448        // Add IPv6 listen address if enabled
449        if config.network.ipv6_enabled {
450            node_config.listen_addrs.push(std::net::SocketAddr::new(
451                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
452                listen_addr.port(),
453            ));
454        }
455
456        Ok(node_config)
457    }
458
459    /// Try to build a NodeConfig from a listen address string
460    pub fn with_listen_addr(addr: &str) -> Result<Self> {
461        let listen_addr: std::net::SocketAddr = addr
462            .parse()
463            .map_err(|e: std::net::AddrParseError| {
464                NetworkError::InvalidAddress(e.to_string().into())
465            })
466            .map_err(P2PError::Network)?;
467        let cfg = NodeConfig {
468            listen_addr,
469            listen_addrs: vec![listen_addr],
470            diversity_config: None,
471            ..Default::default()
472        };
473        Ok(cfg)
474    }
475}
476
477impl Default for DHTConfig {
478    fn default() -> Self {
479        Self {
480            k_value: 20,
481            alpha_value: 5,
482            record_ttl: Duration::from_secs(3600), // 1 hour
483            refresh_interval: Duration::from_secs(600), // 10 minutes
484        }
485    }
486}
487
488impl Default for SecurityConfig {
489    fn default() -> Self {
490        Self {
491            enable_noise: true,
492            enable_tls: true,
493            trust_level: TrustLevel::Basic,
494        }
495    }
496}
497
498/// Information about a connected peer
499#[derive(Debug, Clone)]
500pub struct PeerInfo {
501    /// Peer identifier
502    pub peer_id: PeerId,
503
504    /// Peer's addresses
505    pub addresses: Vec<String>,
506
507    /// Connection timestamp
508    pub connected_at: Instant,
509
510    /// Last seen timestamp
511    pub last_seen: Instant,
512
513    /// Connection status
514    pub status: ConnectionStatus,
515
516    /// Supported protocols
517    pub protocols: Vec<String>,
518
519    /// Number of heartbeats received
520    pub heartbeat_count: u64,
521}
522
523/// Connection status for a peer
524#[derive(Debug, Clone, PartialEq)]
525pub enum ConnectionStatus {
526    /// Connection is being established
527    Connecting,
528    /// Connection is established and active
529    Connected,
530    /// Connection is being closed
531    Disconnecting,
532    /// Connection is closed
533    Disconnected,
534    /// Connection failed
535    Failed(String),
536}
537
538/// Network events that can occur
539#[derive(Debug, Clone)]
540pub enum NetworkEvent {
541    /// A new peer has connected
542    PeerConnected {
543        /// The identifier of the newly connected peer
544        peer_id: PeerId,
545        /// The network addresses where the peer can be reached
546        addresses: Vec<String>,
547    },
548
549    /// A peer has disconnected
550    PeerDisconnected {
551        /// The identifier of the disconnected peer
552        peer_id: PeerId,
553        /// The reason for the disconnection
554        reason: String,
555    },
556
557    /// A message was received from a peer
558    MessageReceived {
559        /// The identifier of the sending peer
560        peer_id: PeerId,
561        /// The protocol used for the message
562        protocol: String,
563        /// The raw message data
564        data: Vec<u8>,
565    },
566
567    /// A connection attempt failed
568    ConnectionFailed {
569        /// The identifier of the peer (if known)
570        peer_id: Option<PeerId>,
571        /// The address where connection was attempted
572        address: String,
573        /// The error message describing the failure
574        error: String,
575    },
576
577    /// DHT record was stored
578    DHTRecordStored {
579        /// The DHT key where the record was stored
580        key: Vec<u8>,
581        /// The value that was stored
582        value: Vec<u8>,
583    },
584
585    /// DHT record was retrieved
586    DHTRecordRetrieved {
587        /// The DHT key that was queried
588        key: Vec<u8>,
589        /// The retrieved value, if found
590        value: Option<Vec<u8>>,
591    },
592}
593
594/// Network events that can occur in the P2P system
595///
596/// Events are broadcast to all listeners and provide real-time
597/// notifications of network state changes and message arrivals.
598#[derive(Debug, Clone)]
599pub enum P2PEvent {
600    /// Message received from a peer on a specific topic
601    Message {
602        /// Topic or channel the message was sent on
603        topic: String,
604        /// Peer ID of the message sender
605        source: PeerId,
606        /// Raw message data payload
607        data: Vec<u8>,
608    },
609    /// A new peer has connected to the network
610    PeerConnected(PeerId),
611    /// A peer has disconnected from the network
612    PeerDisconnected(PeerId),
613}
614
615/// Main P2P node structure
616/// Main P2P network node that manages connections, routing, and communication
617///
618/// This struct represents a complete P2P network participant that can:
619/// - Connect to other peers via QUIC transport
620/// - Participate in distributed hash table (DHT) operations
621/// - Send and receive messages through various protocols
622/// - Handle network events and peer lifecycle
623/// - Provide MCP (Model Context Protocol) services
624pub struct P2PNode {
625    /// Node configuration
626    config: NodeConfig,
627
628    /// Our peer ID
629    peer_id: PeerId,
630
631    /// Connected peers
632    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
633
634    /// Network event broadcaster
635    event_tx: broadcast::Sender<P2PEvent>,
636
637    /// Listen addresses
638    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
639
640    /// Node start time
641    start_time: Instant,
642
643    /// Running state
644    running: RwLock<bool>,
645
646    /// DHT instance (optional)
647    dht: Option<Arc<RwLock<DHT>>>,
648
649    /// Production resource manager (optional)
650    resource_manager: Option<Arc<ResourceManager>>,
651
652    /// Bootstrap cache manager for peer discovery
653    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
654
655    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
656    dual_node: Arc<DualStackNetworkNode>,
657
658    /// Rate limiter for connection and request throttling
659    #[allow(dead_code)]
660    rate_limiter: Arc<RateLimiter>,
661
662    /// Active connections (tracked by peer_id)
663    /// This set is synchronized with ant-quic's connection state via event monitoring
664    active_connections: Arc<RwLock<HashSet<PeerId>>>,
665
666    /// Security dashboard for monitoring
667    pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
668
669    /// Connection lifecycle monitor task handle
670    #[allow(dead_code)]
671    connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
672
673    /// Keepalive task handle
674    #[allow(dead_code)]
675    keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
676
677    /// Periodic maintenance task handle
678    #[allow(dead_code)]
679    periodic_tasks_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
680
681    /// Shutdown flag for background tasks
682    shutdown: Arc<AtomicBool>,
683
684    /// Message receive task handles (recv tasks + consumer)
685    recv_handles: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
686
687    /// Accept loop task handle for graceful shutdown
688    listener_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
689
690    /// GeoIP provider for connection validation
691    #[allow(dead_code)]
692    geo_provider: Arc<BgpGeoProvider>,
693
694    /// This node's entangled identity (derived from public key + binary hash + nonce).
695    /// Used for software attestation verification during peer handshake.
696    entangled_id: Option<crate::attestation::EntangledId>,
697
698    /// BLAKE3 hash of the running binary for attestation.
699    /// In production, this is computed at startup from the executable.
700    binary_hash: [u8; 32],
701}
702
703/// Normalize wildcard bind addresses to localhost loopback addresses
704///
705/// ant-quic correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
706/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
707///
708/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
709/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
710/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
711/// - All other addresses pass through unchanged
712///
713/// # Arguments
714/// * `addr` - The SocketAddr to normalize
715///
716/// # Returns
717/// * Normalized SocketAddr suitable for remote connections
718fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
719    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
720
721    if addr.ip().is_unspecified() {
722        // Convert unspecified addresses to loopback
723        let loopback_ip = match addr {
724            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
725            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
726        };
727        std::net::SocketAddr::new(loopback_ip, addr.port())
728    } else {
729        // Not a wildcard address, pass through unchanged
730        addr
731    }
732}
733
734impl P2PNode {
735    /// Minimal constructor for tests that avoids real networking
736    pub fn new_for_tests() -> Result<Self> {
737        let (event_tx, _) = broadcast::channel(16);
738        Ok(Self {
739            config: NodeConfig::default(),
740            peer_id: "test_peer".to_string(),
741            peers: Arc::new(RwLock::new(HashMap::new())),
742            event_tx,
743            listen_addrs: RwLock::new(Vec::new()),
744            start_time: Instant::now(),
745            running: RwLock::new(false),
746            dht: None,
747            resource_manager: None,
748            bootstrap_manager: None,
749            dual_node: {
750                // Bind dual-stack nodes on ephemeral ports for tests
751                let v6: Option<std::net::SocketAddr> = "[::1]:0"
752                    .parse()
753                    .ok()
754                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
755                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
756                let handle = tokio::runtime::Handle::current();
757                let dual_attempt = handle.block_on(
758                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
759                );
760                let dual = match dual_attempt {
761                    Ok(d) => d,
762                    Err(_e1) => {
763                        // Fallback to IPv4-only ephemeral bind
764                        let fallback = handle.block_on(
765                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
766                                None,
767                                "127.0.0.1:0".parse().ok(),
768                            ),
769                        );
770                        match fallback {
771                            Ok(d) => d,
772                            Err(e2) => {
773                                return Err(P2PError::Network(NetworkError::BindError(
774                                    format!("Failed to create dual-stack network node: {}", e2)
775                                        .into(),
776                                )));
777                            }
778                        }
779                    }
780                };
781                Arc::new(dual)
782            },
783            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
784                max_requests: 100,
785                burst_size: 100,
786                window: std::time::Duration::from_secs(1),
787                ..Default::default()
788            })),
789            active_connections: Arc::new(RwLock::new(HashSet::new())),
790            connection_monitor_handle: Arc::new(RwLock::new(None)),
791            keepalive_handle: Arc::new(RwLock::new(None)),
792            periodic_tasks_handle: Arc::new(RwLock::new(None)),
793            shutdown: Arc::new(AtomicBool::new(false)),
794            recv_handles: Arc::new(RwLock::new(Vec::new())),
795            listener_handle: Arc::new(RwLock::new(None)),
796            geo_provider: Arc::new(BgpGeoProvider::new()),
797            security_dashboard: None,
798            // Attestation fields - use dummy values for tests
799            entangled_id: None,
800            binary_hash: [0u8; 32],
801        })
802    }
803    /// Create a new P2P node with the given configuration
804    pub async fn new(config: NodeConfig) -> Result<Self> {
805        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
806            // Generate a random peer ID for now
807            format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
808        });
809
810        let (event_tx, _) = broadcast::channel(1000);
811
812        // Initialize and register a TrustWeightedKademlia DHT for the global API
813        // Use a deterministic local NodeId derived from the peer_id
814        {
815            use blake3::Hasher;
816            let mut hasher = Hasher::new();
817            hasher.update(peer_id.as_bytes());
818            let digest = hasher.finalize();
819            let mut nid = [0u8; 32];
820            nid.copy_from_slice(digest.as_bytes());
821            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
822                crate::identity::node_identity::NodeId::from_bytes(nid),
823            ));
824            // TODO: Update to use new clean API
825            // let _ = crate::api::set_dht_instance(twdht);
826        }
827
828        // Initialize DHT if needed
829        let (dht, security_dashboard) = if true {
830            // Assuming DHT is always enabled for now, or check config
831            let _dht_config = crate::dht::DHTConfig {
832                replication_factor: config.dht_config.k_value,
833                bucket_size: config.dht_config.k_value,
834                alpha: config.dht_config.alpha_value,
835                record_ttl: config.dht_config.record_ttl,
836                bucket_refresh_interval: config.dht_config.refresh_interval,
837                republish_interval: config.dht_config.refresh_interval,
838                max_distance: 160,
839            };
840            // Convert peer_id String to NodeId
841            let peer_bytes = peer_id.as_bytes();
842            let mut node_id_bytes = [0u8; 32];
843            let len = peer_bytes.len().min(32);
844            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
845            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
846            let dht_instance = DHT::new(node_id).map_err(|e| {
847                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
848                    e.to_string().into(),
849                ))
850            })?;
851            dht_instance.start_maintenance_tasks();
852
853            // Create Security Dashboard
854            let security_metrics = dht_instance.security_metrics();
855            let dashboard = crate::dht::metrics::SecurityDashboard::new(
856                security_metrics,
857                Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
858                Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
859                Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
860            );
861
862            (
863                Some(Arc::new(RwLock::new(dht_instance))),
864                Some(Arc::new(dashboard)),
865            )
866        } else {
867            (None, None)
868        };
869
870        // MCP removed
871
872        // Initialize production resource manager if configured
873        let resource_manager = config
874            .production_config
875            .clone()
876            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
877
878        // Initialize bootstrap cache manager
879        let diversity_config = config.diversity_config.clone().unwrap_or_default();
880        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
881            match BootstrapManager::with_full_config(
882                cache_config.clone(),
883                crate::rate_limit::JoinRateLimiterConfig::default(),
884                diversity_config.clone(),
885            )
886            .await
887            {
888                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
889                Err(e) => {
890                    warn!(
891                        "Failed to initialize bootstrap manager: {}, continuing without cache",
892                        e
893                    );
894                    None
895                }
896            }
897        } else {
898            match BootstrapManager::with_full_config(
899                crate::bootstrap::CacheConfig::default(),
900                crate::rate_limit::JoinRateLimiterConfig::default(),
901                diversity_config,
902            )
903            .await
904            {
905                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
906                Err(e) => {
907                    warn!(
908                        "Failed to initialize bootstrap manager: {}, continuing without cache",
909                        e
910                    );
911                    None
912                }
913            }
914        };
915
916        // Initialize dual-stack ant-quic nodes
917        // Determine bind addresses
918        let (v6_opt, v4_opt) = {
919            let port = config.listen_addr.port();
920            let ip = config.listen_addr.ip();
921
922            let v4_addr = if ip.is_ipv4() {
923                Some(std::net::SocketAddr::new(ip, port))
924            } else {
925                // If config is IPv6, we still might want IPv4 on UNSPECIFIED if dual stack is desired
926                // But for now let's just stick to defaults if not specified
927                Some(std::net::SocketAddr::new(
928                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
929                    port,
930                ))
931            };
932
933            let v6_addr = if config.enable_ipv6 {
934                if ip.is_ipv6() {
935                    Some(std::net::SocketAddr::new(ip, port))
936                } else {
937                    Some(std::net::SocketAddr::new(
938                        std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
939                        port,
940                    ))
941                }
942            } else {
943                None
944            };
945            (v6_addr, v4_addr)
946        };
947
948        let dual_node = Arc::new(
949            DualStackNetworkNode::new_with_max_connections(v6_opt, v4_opt, config.max_connections)
950                .await
951                .map_err(|e| {
952                    P2PError::Transport(crate::error::TransportError::SetupFailed(
953                        format!("Failed to create dual-stack network nodes: {}", e).into(),
954                    ))
955                })?,
956        );
957
958        // Initialize rate limiter with default config
959        let rate_limiter = Arc::new(RateLimiter::new(
960            crate::validation::RateLimitConfig::default(),
961        ));
962
963        // Create active connections tracker
964        let active_connections = Arc::new(RwLock::new(HashSet::new()));
965
966        // Initialize GeoIP provider
967        let geo_provider = Arc::new(BgpGeoProvider::new());
968
969        // Create peers map
970        let peers = Arc::new(RwLock::new(HashMap::new()));
971
972        // Start connection lifecycle monitor
973        // CRITICAL: Subscribe to connection events BEFORE spawning the task
974        // to avoid race condition where early connections are missed
975        let connection_event_rx = dual_node.subscribe_connection_events();
976
977        let connection_monitor_handle = {
978            let active_conns = Arc::clone(&active_connections);
979            let peers_map = Arc::clone(&peers);
980            let event_tx_clone = event_tx.clone();
981            let dual_node_clone = Arc::clone(&dual_node);
982            let geo_provider_clone = Arc::clone(&geo_provider);
983            let peer_id_clone = peer_id.clone();
984
985            let handle = tokio::spawn(async move {
986                Self::connection_lifecycle_monitor_with_rx(
987                    dual_node_clone,
988                    connection_event_rx,
989                    active_conns,
990                    peers_map,
991                    event_tx_clone,
992                    geo_provider_clone,
993                    peer_id_clone,
994                )
995                .await;
996            });
997
998            Arc::new(RwLock::new(Some(handle)))
999        };
1000
1001        // Spawn keepalive task
1002        let shutdown = Arc::new(AtomicBool::new(false));
1003        let keepalive_handle = {
1004            let active_conns = Arc::clone(&active_connections);
1005            let dual_node_clone = Arc::clone(&dual_node);
1006            let shutdown_clone = Arc::clone(&shutdown);
1007
1008            let handle = tokio::spawn(async move {
1009                Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
1010            });
1011
1012            Arc::new(RwLock::new(Some(handle)))
1013        };
1014
1015        // Spawn periodic maintenance task for stale peer detection
1016        let periodic_tasks_handle = {
1017            let peers_clone = Arc::clone(&peers);
1018            let active_conns_clone = Arc::clone(&active_connections);
1019            let event_tx_clone = event_tx.clone();
1020            let stale_threshold = config.stale_peer_threshold;
1021            let shutdown_clone = Arc::clone(&shutdown);
1022
1023            let handle = tokio::spawn(async move {
1024                Self::periodic_maintenance_task(
1025                    peers_clone,
1026                    active_conns_clone,
1027                    event_tx_clone,
1028                    stale_threshold,
1029                    shutdown_clone,
1030                )
1031                .await;
1032            });
1033
1034            Arc::new(RwLock::new(Some(handle)))
1035        };
1036
1037        // Compute binary hash for attestation (in production, this would be the actual binary)
1038        // For now, we use a placeholder that will be replaced during node initialization
1039        let binary_hash = Self::compute_binary_hash();
1040
1041        let node = Self {
1042            config,
1043            peer_id,
1044            peers,
1045            event_tx,
1046            listen_addrs: RwLock::new(Vec::new()),
1047            start_time: Instant::now(),
1048            running: RwLock::new(false),
1049            dht,
1050            resource_manager,
1051            bootstrap_manager,
1052            dual_node,
1053            rate_limiter,
1054            active_connections,
1055            security_dashboard,
1056            connection_monitor_handle,
1057            keepalive_handle,
1058            periodic_tasks_handle,
1059            shutdown,
1060            recv_handles: Arc::new(RwLock::new(Vec::new())),
1061            listener_handle: Arc::new(RwLock::new(None)),
1062            geo_provider,
1063            // Attestation - EntangledId will be derived later when NodeIdentity is available
1064            entangled_id: None,
1065            binary_hash,
1066        };
1067        info!(
1068            "Created P2P node with peer ID: {} (call start() to begin networking)",
1069            node.peer_id
1070        );
1071
1072        Ok(node)
1073    }
1074
1075    /// Create a new node builder
1076    pub fn builder() -> NodeBuilder {
1077        NodeBuilder::new()
1078    }
1079
1080    /// Get the peer ID of this node
1081    pub fn peer_id(&self) -> &PeerId {
1082        &self.peer_id
1083    }
1084
1085    /// Get the hex-encoded transport-level peer ID.
1086    ///
1087    /// This is the ID used in `P2PEvent::Message.source`, `connected_peers()`,
1088    /// and `send_message()`. It differs from `peer_id()` which is the app-level ID.
1089    ///
1090    /// Returns the transport peer ID from whichever stack (v4 or v6) is active.
1091    pub fn transport_peer_id(&self) -> Option<String> {
1092        if let Some(ref v4) = self.dual_node.v4 {
1093            return Some(ant_peer_id_to_string(&v4.our_peer_id()));
1094        }
1095        if let Some(ref v6) = self.dual_node.v6 {
1096            return Some(ant_peer_id_to_string(&v6.our_peer_id()));
1097        }
1098        None
1099    }
1100
1101    pub fn local_addr(&self) -> Option<String> {
1102        self.listen_addrs
1103            .try_read()
1104            .ok()
1105            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
1106    }
1107
1108    pub async fn subscribe(&self, topic: &str) -> Result<()> {
1109        // In a real implementation, this would register the topic with the pubsub mechanism.
1110        // For now, we just log it.
1111        info!("Subscribed to topic: {}", topic);
1112        Ok(())
1113    }
1114
1115    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1116        info!(
1117            "Publishing message to topic: {} ({} bytes)",
1118            topic,
1119            data.len()
1120        );
1121
1122        // Get list of connected peers
1123        let peer_list: Vec<PeerId> = {
1124            let peers_guard = self.peers.read().await;
1125            peers_guard.keys().cloned().collect()
1126        };
1127
1128        if peer_list.is_empty() {
1129            debug!("No peers connected, message will only be sent to local subscribers");
1130        } else {
1131            // Send message to all connected peers
1132            let mut send_count = 0;
1133            for peer_id in &peer_list {
1134                match self.send_message(peer_id, topic, data.to_vec()).await {
1135                    Ok(_) => {
1136                        send_count += 1;
1137                        debug!("Sent message to peer: {}", peer_id);
1138                    }
1139                    Err(e) => {
1140                        warn!("Failed to send message to peer {}: {}", peer_id, e);
1141                    }
1142                }
1143            }
1144            info!(
1145                "Published message to {}/{} connected peers",
1146                send_count,
1147                peer_list.len()
1148            );
1149        }
1150
1151        // Also send to local subscribers (for local echo and testing)
1152        let event = P2PEvent::Message {
1153            topic: topic.to_string(),
1154            source: self.peer_id.clone(),
1155            data: data.to_vec(),
1156        };
1157        let _ = self.event_tx.send(event);
1158
1159        Ok(())
1160    }
1161
1162    /// Get the node configuration
1163    pub fn config(&self) -> &NodeConfig {
1164        &self.config
1165    }
1166
1167    /// Start the P2P node
1168    pub async fn start(&self) -> Result<()> {
1169        info!("Starting P2P node...");
1170
1171        // Start production resource manager if configured
1172        if let Some(ref resource_manager) = self.resource_manager {
1173            resource_manager.start().await.map_err(|e| {
1174                P2PError::Network(crate::error::NetworkError::ProtocolError(
1175                    format!("Failed to start resource manager: {e}").into(),
1176                ))
1177            })?;
1178            info!("Production resource manager started");
1179        }
1180
1181        // Start bootstrap manager background tasks
1182        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1183            let mut manager = bootstrap_manager.write().await;
1184            manager.start_background_tasks().await.map_err(|e| {
1185                P2PError::Network(crate::error::NetworkError::ProtocolError(
1186                    format!("Failed to start bootstrap manager: {e}").into(),
1187                ))
1188            })?;
1189            info!("Bootstrap cache manager started");
1190        }
1191
1192        // Set running state
1193        *self.running.write().await = true;
1194
1195        // Start listening on configured addresses using transport layer
1196        self.start_network_listeners().await?;
1197
1198        // Update the connection monitor with actual peers reference
1199        self.start_connection_monitor().await;
1200
1201        // Log current listen addresses
1202        let listen_addrs = self.listen_addrs.read().await;
1203        info!("P2P node started on addresses: {:?}", *listen_addrs);
1204
1205        // Start message receiving system
1206        self.start_message_receiving_system().await?;
1207
1208        // Connect to bootstrap peers
1209        self.connect_bootstrap_peers().await?;
1210
1211        Ok(())
1212    }
1213
1214    /// Start network listeners on configured addresses
1215    async fn start_network_listeners(&self) -> Result<()> {
1216        info!("Starting dual-stack listeners (ant-quic)...");
1217        // Update our listen_addrs from the dual node bindings
1218        let addrs = self.dual_node.local_addrs().await.map_err(|e| {
1219            P2PError::Transport(crate::error::TransportError::SetupFailed(
1220                format!("Failed to get local addresses: {}", e).into(),
1221            ))
1222        })?;
1223        {
1224            let mut la = self.listen_addrs.write().await;
1225            *la = addrs.clone();
1226        }
1227
1228        // Spawn a background accept loop that handles incoming connections from either stack
1229        let event_tx = self.event_tx.clone();
1230        let peers = self.peers.clone();
1231        let active_connections = self.active_connections.clone();
1232        let rate_limiter = self.rate_limiter.clone();
1233        let dual = self.dual_node.clone();
1234        let shutdown = Arc::clone(&self.shutdown);
1235        let handle = tokio::spawn(async move {
1236            loop {
1237                if shutdown.load(Ordering::Relaxed) {
1238                    break;
1239                }
1240                match dual.accept_any().await {
1241                    Ok((ant_peer_id, remote_sock)) => {
1242                        // Enforce rate limiting — skip peer registration when rate-limited
1243                        if let Err(e) = rate_limiter.check_ip(&remote_sock.ip()) {
1244                            warn!(
1245                                "Rate-limited incoming connection from {}: {}",
1246                                remote_sock, e
1247                            );
1248                            continue;
1249                        }
1250
1251                        debug!(
1252                            "Accepted connection from {} (protocol validation pending ant-quic implementation)",
1253                            remote_sock
1254                        );
1255
1256                        let peer_id = ant_peer_id_to_string(&ant_peer_id);
1257                        let remote_addr = NetworkAddress::from(remote_sock);
1258                        let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1259                        register_new_peer(&peers, &peer_id, &remote_addr).await;
1260                        active_connections.write().await.insert(peer_id);
1261                    }
1262                    Err(e) => {
1263                        warn!("Accept failed: {}", e);
1264                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1265                    }
1266                }
1267            }
1268        });
1269        *self.listener_handle.write().await = Some(handle);
1270
1271        info!("Dual-stack listeners active on: {:?}", addrs);
1272        Ok(())
1273    }
1274
1275    /// Start the message receiving system with channel-based background tasks
1276    ///
1277    /// Spawns per-stack recv tasks that feed into a single mpsc channel,
1278    /// giving instant wakeup on data arrival without poll/timeout latency.
1279    async fn start_message_receiving_system(&self) -> Result<()> {
1280        info!("Starting message receiving system");
1281
1282        let (tx, mut rx) = tokio::sync::mpsc::channel(RECV_CHANNEL_CAPACITY);
1283        let shutdown = Arc::clone(&self.shutdown);
1284
1285        let mut handles = Vec::new();
1286
1287        // Spawn per-stack recv tasks — both feed into the same channel
1288        if let Some(v6) = self.dual_node.v6.as_ref() {
1289            handles.push(v6.spawn_recv_task(tx.clone(), Arc::clone(&shutdown)));
1290        }
1291        if let Some(v4) = self.dual_node.v4.as_ref() {
1292            handles.push(v4.spawn_recv_task(tx.clone(), Arc::clone(&shutdown)));
1293        }
1294        drop(tx); // drop original sender so rx closes when all tasks exit
1295
1296        let event_tx = self.event_tx.clone();
1297        handles.push(tokio::spawn(async move {
1298            info!("Message receive loop started");
1299            while let Some((peer_id, bytes)) = rx.recv().await {
1300                let transport_peer_id = ant_peer_id_to_string(&peer_id);
1301                info!(
1302                    "Received {} bytes from peer {}",
1303                    bytes.len(),
1304                    transport_peer_id
1305                );
1306
1307                if bytes == KEEPALIVE_PAYLOAD {
1308                    trace!("Received keepalive from {}", transport_peer_id);
1309                    continue;
1310                }
1311
1312                match parse_protocol_message(&bytes, &transport_peer_id) {
1313                    Some(event) => {
1314                        let _ = event_tx.send(event);
1315                    }
1316                    None => {
1317                        warn!("Failed to parse protocol message ({} bytes)", bytes.len());
1318                    }
1319                }
1320            }
1321            info!("Message receive loop ended — channel closed");
1322        }));
1323
1324        *self.recv_handles.write().await = handles;
1325
1326        Ok(())
1327    }
1328
1329    // MCP removed
1330
1331    // MCP removed
1332
1333    /// Run the P2P node (blocks until shutdown)
1334    pub async fn run(&self) -> Result<()> {
1335        if !*self.running.read().await {
1336            self.start().await?;
1337        }
1338
1339        info!("P2P node running...");
1340
1341        // Main event loop
1342        loop {
1343            if !*self.running.read().await {
1344                break;
1345            }
1346
1347            // Perform periodic tasks
1348            self.periodic_tasks().await?;
1349
1350            // Sleep for a short interval
1351            tokio::time::sleep(Duration::from_millis(100)).await;
1352        }
1353
1354        info!("P2P node stopped");
1355        Ok(())
1356    }
1357
1358    /// Stop the P2P node
1359    pub async fn stop(&self) -> Result<()> {
1360        info!("Stopping P2P node...");
1361
1362        // Signal all background tasks to stop
1363        self.shutdown.store(true, Ordering::Relaxed);
1364
1365        // Set running state to false
1366        *self.running.write().await = false;
1367
1368        // Shut down the transport endpoints first. This cancels the
1369        // endpoint CancellationToken which unblocks any recv() calls
1370        // and aborts per-connection reader tasks inside ant-quic.
1371        // Note: ant-quic >=0.20 races recv() against the shutdown token
1372        // internally, so this is belt-and-suspenders -- but calling it
1373        // first is still the safest ordering.
1374        self.dual_node.shutdown_endpoints().await;
1375
1376        // Await recv system tasks
1377        let handles: Vec<_> = self.recv_handles.write().await.drain(..).collect();
1378        for handle in handles {
1379            let _ = handle.await;
1380        }
1381
1382        // Await accept loop task
1383        if let Some(handle) = self.listener_handle.write().await.take() {
1384            let _ = handle.await;
1385        }
1386
1387        // Disconnect all peers
1388        self.disconnect_all_peers().await?;
1389
1390        // Shutdown production resource manager if configured
1391        if let Some(ref resource_manager) = self.resource_manager {
1392            resource_manager.shutdown().await.map_err(|e| {
1393                P2PError::Network(crate::error::NetworkError::ProtocolError(
1394                    format!("Failed to shutdown resource manager: {e}").into(),
1395                ))
1396            })?;
1397            info!("Production resource manager stopped");
1398        }
1399
1400        info!("P2P node stopped");
1401        Ok(())
1402    }
1403
1404    /// Graceful shutdown alias for tests
1405    pub async fn shutdown(&self) -> Result<()> {
1406        self.stop().await
1407    }
1408
1409    /// Check if the node is running
1410    pub async fn is_running(&self) -> bool {
1411        *self.running.read().await
1412    }
1413
1414    /// Get the current listen addresses
1415    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1416        self.listen_addrs.read().await.clone()
1417    }
1418
1419    /// Get connected peers
1420    pub async fn connected_peers(&self) -> Vec<PeerId> {
1421        // "Connected" is defined as currently active at the transport layer.
1422        // The peers map may contain historical peers with Disconnected/Failed status.
1423        self.active_connections
1424            .read()
1425            .await
1426            .iter()
1427            .cloned()
1428            .collect()
1429    }
1430
1431    /// Get peer count
1432    pub async fn peer_count(&self) -> usize {
1433        self.active_connections.read().await.len()
1434    }
1435
1436    /// Get peer info
1437    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1438        self.peers.read().await.get(peer_id).cloned()
1439    }
1440
1441    /// Get the peer ID for a given socket address, if connected
1442    ///
1443    /// This method searches through all connected peers to find one that has
1444    /// the specified address in its address list.
1445    ///
1446    /// # Arguments
1447    /// * `addr` - The socket address to search for (e.g., "192.168.1.100:9000")
1448    ///
1449    /// # Returns
1450    /// * `Some(PeerId)` - The peer ID if a matching connected peer is found
1451    /// * `None` - If no peer with this address is currently connected
1452    pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1453        // Parse the address to a SocketAddr for comparison
1454        let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1455
1456        let peers = self.peers.read().await;
1457
1458        // Search through all connected peers
1459        for (peer_id, peer_info) in peers.iter() {
1460            // Check if this peer has a matching address
1461            for peer_addr in &peer_info.addresses {
1462                if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1463                    && peer_socket == socket_addr
1464                {
1465                    return Some(peer_id.clone());
1466                }
1467            }
1468        }
1469
1470        None
1471    }
1472
1473    /// List all active connections with their peer IDs and addresses
1474    ///
1475    /// # Returns
1476    /// A vector of tuples containing (PeerId, Vec<String>) where the Vec<String>
1477    /// contains all known addresses for that peer.
1478    pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1479        let active = self.active_connections.read().await;
1480        let peers = self.peers.read().await;
1481
1482        active
1483            .iter()
1484            .map(|peer_id| {
1485                let addresses = peers
1486                    .get(peer_id)
1487                    .map(|info| info.addresses.clone())
1488                    .unwrap_or_default();
1489                (peer_id.clone(), addresses)
1490            })
1491            .collect()
1492    }
1493
1494    /// Remove a peer from the peers map
1495    ///
1496    /// This method removes a peer from the internal peers map. It should be used
1497    /// when a connection is no longer valid (e.g., after detecting that the underlying
1498    /// ant-quic connection has closed).
1499    ///
1500    /// # Arguments
1501    /// * `peer_id` - The ID of the peer to remove
1502    ///
1503    /// # Returns
1504    /// `true` if the peer was found and removed, `false` if the peer was not in the map
1505    pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1506        // Remove from active connections tracking
1507        self.active_connections.write().await.remove(peer_id);
1508        // Remove from peers map and return whether it existed
1509        self.peers.write().await.remove(peer_id).is_some()
1510    }
1511
1512    /// Check if a peer is connected
1513    ///
1514    /// This method checks if the peer ID exists in the peers map. Note that this
1515    /// only verifies the peer is registered - it does not guarantee the underlying
1516    /// ant-quic connection is still active. For connection validation, use `send_message`
1517    /// which will fail if the connection is closed.
1518    ///
1519    /// # Arguments
1520    /// * `peer_id` - The ID of the peer to check
1521    ///
1522    /// # Returns
1523    /// `true` if the peer exists in the peers map, `false` otherwise
1524    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1525        self.peers.read().await.contains_key(peer_id)
1526    }
1527
1528    /// Connect to a peer
1529    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1530        info!("Connecting to peer at: {}", address);
1531
1532        // Check production limits if resource manager is enabled
1533        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1534            Some(resource_manager.acquire_connection().await?)
1535        } else {
1536            None
1537        };
1538
1539        // Parse the address to SocketAddr format
1540        let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1541            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1542                format!("{}: {}", address, e).into(),
1543            ))
1544        })?;
1545
1546        // Normalize wildcard addresses to loopback for local connections
1547        // This converts [::]:port → ::1:port and 0.0.0.0:port → 127.0.0.1:port
1548        let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1549        if normalized_addr != socket_addr {
1550            info!(
1551                "Normalized wildcard address {} to loopback {}",
1552                socket_addr, normalized_addr
1553            );
1554        }
1555
1556        // Establish a real connection via dual-stack Happy Eyeballs, but cap the wait
1557        let addr_list = vec![normalized_addr];
1558        let peer_id = match tokio::time::timeout(
1559            self.config.connection_timeout,
1560            self.dual_node.connect_happy_eyeballs(&addr_list),
1561        )
1562        .await
1563        {
1564            Ok(Ok(peer)) => {
1565                let connected_peer_id = ant_peer_id_to_string(&peer);
1566                info!("Successfully connected to peer: {}", connected_peer_id);
1567
1568                // Prevent self-connections by checking if remote peer_id matches our own
1569                if connected_peer_id == self.peer_id {
1570                    warn!(
1571                        "Detected self-connection to own address {} (peer_id: {}), rejecting",
1572                        address, connected_peer_id
1573                    );
1574                    // Actively close the QUIC connection instead of leaking it
1575                    // until idle timeout
1576                    self.dual_node.disconnect_peer(&peer).await;
1577                    return Err(P2PError::Network(
1578                        crate::error::NetworkError::InvalidAddress(
1579                            format!("Cannot connect to self ({})", address).into(),
1580                        ),
1581                    ));
1582                }
1583
1584                connected_peer_id
1585            }
1586            Ok(Err(e)) => {
1587                warn!("Failed to connect to peer at {}: {}", address, e);
1588                return Err(P2PError::Transport(
1589                    crate::error::TransportError::ConnectionFailed {
1590                        addr: normalized_addr,
1591                        reason: e.to_string().into(),
1592                    },
1593                ));
1594            }
1595            Err(_) => {
1596                warn!(
1597                    "Timed out connecting to peer at {} after {:?}",
1598                    address, self.config.connection_timeout
1599                );
1600                return Err(P2PError::Timeout(self.config.connection_timeout));
1601            }
1602        };
1603
1604        // Create peer info with connection details
1605        let peer_info = PeerInfo {
1606            peer_id: peer_id.clone(),
1607            addresses: vec![address.to_string()],
1608            connected_at: Instant::now(),
1609            last_seen: Instant::now(),
1610            status: ConnectionStatus::Connected,
1611            protocols: vec!["p2p-foundation/1.0".to_string()],
1612            heartbeat_count: 0,
1613        };
1614
1615        // Store peer information
1616        self.peers.write().await.insert(peer_id.clone(), peer_info);
1617
1618        // Add to active connections tracking
1619        // This is critical for is_connection_active() to work correctly
1620        self.active_connections
1621            .write()
1622            .await
1623            .insert(peer_id.clone());
1624
1625        // Record bandwidth usage if resource manager is enabled
1626        if let Some(ref resource_manager) = self.resource_manager {
1627            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1628        }
1629
1630        // Emit connection event
1631        let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1632
1633        info!("Connected to peer: {}", peer_id);
1634        Ok(peer_id)
1635    }
1636
1637    /// Disconnect from a peer
1638    ///
1639    /// Actively closes the underlying QUIC connection via
1640    /// `P2pEndpoint::disconnect()` and removes the peer from all
1641    /// local tracking maps.
1642    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1643        info!("Disconnecting from peer: {}", peer_id);
1644
1645        // Close the actual QUIC connection so the remote side is notified
1646        // immediately rather than waiting for idle timeout.
1647        self.dual_node.disconnect_peer_string(peer_id).await.ok();
1648
1649        // Remove from active connections
1650        self.active_connections.write().await.remove(peer_id);
1651
1652        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1653            peer_info.status = ConnectionStatus::Disconnected;
1654
1655            // Emit event
1656            let _ = self
1657                .event_tx
1658                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1659
1660            info!("Disconnected from peer: {}", peer_id);
1661        }
1662
1663        Ok(())
1664    }
1665
1666    /// Check if a connection to a peer is active
1667    pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1668        self.active_connections.read().await.contains(peer_id)
1669    }
1670
1671    /// Send a message to a peer
1672    pub async fn send_message(
1673        &self,
1674        peer_id: &PeerId,
1675        protocol: &str,
1676        data: Vec<u8>,
1677    ) -> Result<()> {
1678        debug!(
1679            "Sending message to peer {} on protocol {}",
1680            peer_id, protocol
1681        );
1682
1683        // Check rate limits if resource manager is enabled
1684        if let Some(ref resource_manager) = self.resource_manager
1685            && !resource_manager
1686                .check_rate_limit(peer_id, "message")
1687                .await?
1688        {
1689            return Err(P2PError::ResourceExhausted(
1690                format!("Rate limit exceeded for peer {}", peer_id).into(),
1691            ));
1692        }
1693
1694        // Check if peer exists in peers map
1695        if !self.peers.read().await.contains_key(peer_id) {
1696            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1697                peer_id.to_string().into(),
1698            )));
1699        }
1700
1701        // **NEW**: Check if the ant-quic connection is actually active
1702        // This is the critical fix for the connection state synchronization issue
1703        if !self.is_connection_active(peer_id).await {
1704            debug!(
1705                "Connection to peer {} exists in peers map but ant-quic connection is closed",
1706                peer_id
1707            );
1708
1709            // Clean up stale peer entry
1710            self.remove_peer(peer_id).await;
1711
1712            return Err(P2PError::Network(
1713                crate::error::NetworkError::ConnectionClosed {
1714                    peer_id: peer_id.to_string().into(),
1715                },
1716            ));
1717        }
1718
1719        // MCP removed: no special-case protocol validation
1720
1721        // Record bandwidth usage if resource manager is enabled
1722        if let Some(ref resource_manager) = self.resource_manager {
1723            resource_manager.record_bandwidth(data.len() as u64, 0);
1724        }
1725
1726        // Create protocol message wrapper
1727        let raw_data_len = data.len();
1728        let _message_data = self.create_protocol_message(protocol, data)?;
1729        info!(
1730            "Sending {} bytes to peer {} on protocol {} (raw data: {} bytes)",
1731            _message_data.len(),
1732            peer_id,
1733            protocol,
1734            raw_data_len
1735        );
1736
1737        // Send via ant-quic dual-node using the optimized send method
1738        // This uses P2pEndpoint::send() which corresponds with recv() for proper
1739        // bidirectional communication
1740        let send_fut = self
1741            .dual_node
1742            .send_to_peer_string_optimized(peer_id, &_message_data);
1743        let result = tokio::time::timeout(self.config.connection_timeout, send_fut)
1744            .await
1745            .map_err(|_| {
1746                P2PError::Transport(crate::error::TransportError::StreamError(
1747                    "Timed out sending message".into(),
1748                ))
1749            })?
1750            .map_err(|e| {
1751                P2PError::Transport(crate::error::TransportError::StreamError(
1752                    e.to_string().into(),
1753                ))
1754            });
1755
1756        if result.is_ok() {
1757            info!(
1758                "Successfully sent {} bytes to peer {}",
1759                _message_data.len(),
1760                peer_id
1761            );
1762        } else {
1763            warn!("Failed to send message to peer {}", peer_id);
1764        }
1765
1766        result
1767    }
1768
1769    /// Create a protocol message wrapper
1770    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1771        let timestamp = std::time::SystemTime::now()
1772            .duration_since(std::time::UNIX_EPOCH)
1773            .map_err(|e| {
1774                P2PError::Network(NetworkError::ProtocolError(
1775                    format!("System time error: {}", e).into(),
1776                ))
1777            })?
1778            .as_secs();
1779
1780        let message = WireMessage {
1781            protocol: protocol.to_string(),
1782            data,
1783            from: self.peer_id.clone(),
1784            timestamp,
1785        };
1786
1787        postcard::to_stdvec(&message).map_err(|e| {
1788            P2PError::Transport(crate::error::TransportError::StreamError(
1789                format!("Failed to serialize message: {e}").into(),
1790            ))
1791        })
1792    }
1793
1794    // Note: async listen_addrs() already exists above for fetching listen addresses
1795}
1796
1797/// Parse a postcard-encoded protocol message into a `P2PEvent::Message`.
1798///
1799/// Returns `None` if the bytes cannot be deserialized as a valid `WireMessage`.
1800///
1801/// The `from` field is a required part of the wire protocol but is **not**
1802/// used as the event source.  Instead, `source` — the transport-level peer ID
1803/// derived from the authenticated QUIC connection — is used so that consumers
1804/// can pass it directly to `send_message()`.  This eliminates a spoofing
1805/// vector where a peer could claim an arbitrary identity via the payload.
1806fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<P2PEvent> {
1807    let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1808
1809    debug!(
1810        "Parsed P2PEvent::Message - topic: {}, source: {} (logical: {}), payload_len: {}",
1811        message.protocol,
1812        source,
1813        message.from,
1814        message.data.len()
1815    );
1816
1817    Some(P2PEvent::Message {
1818        topic: message.protocol,
1819        source: source.to_string(),
1820        data: message.data,
1821    })
1822}
1823
1824impl P2PNode {
1825    /// Subscribe to network events
1826    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1827        self.event_tx.subscribe()
1828    }
1829
1830    /// Backwards-compat event stream accessor for tests
1831    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1832        self.subscribe_events()
1833    }
1834
1835    /// Get node uptime
1836    pub fn uptime(&self) -> Duration {
1837        self.start_time.elapsed()
1838    }
1839
1840    // =========================================================================
1841    // Attestation Methods (Phase 1: Soft Enforcement)
1842    // =========================================================================
1843
1844    /// Compute the BLAKE3 hash of the running binary.
1845    ///
1846    /// In production, this reads the actual executable file and hashes it.
1847    /// Returns a placeholder hash if the binary cannot be read.
1848    fn compute_binary_hash() -> [u8; 32] {
1849        // Try to get the path to the current executable and hash it
1850        if let Some(hash) = std::env::current_exe()
1851            .ok()
1852            .and_then(|exe_path| std::fs::read(&exe_path).ok())
1853            .map(|binary_data| blake3::hash(&binary_data))
1854        {
1855            return *hash.as_bytes();
1856        }
1857        // Fallback: return a deterministic placeholder based on compile-time info
1858        // This allows tests and development to work without actual binary hashing
1859        let placeholder = format!(
1860            "saorsa-core-v{}-{}",
1861            env!("CARGO_PKG_VERSION"),
1862            std::env::consts::ARCH
1863        );
1864        let hash = blake3::hash(placeholder.as_bytes());
1865        *hash.as_bytes()
1866    }
1867
1868    /// Get this node's binary hash used for attestation.
1869    #[must_use]
1870    pub fn binary_hash(&self) -> &[u8; 32] {
1871        &self.binary_hash
1872    }
1873
1874    /// Get this node's entangled identity, if set.
1875    #[must_use]
1876    pub fn entangled_id(&self) -> Option<&crate::attestation::EntangledId> {
1877        self.entangled_id.as_ref()
1878    }
1879
1880    /// Set the entangled identity for this node.
1881    ///
1882    /// This should be called after the node's cryptographic identity is established,
1883    /// typically by deriving from the NodeIdentity's public key.
1884    pub fn set_entangled_id(&mut self, entangled_id: crate::attestation::EntangledId) {
1885        self.entangled_id = Some(entangled_id);
1886    }
1887
1888    /// Verify a peer's attestation and return the enforcement decision.
1889    ///
1890    /// This function implements the Entangled Attestation verification protocol
1891    /// (Phase 6: Hard Enforcement). Based on the configured enforcement mode:
1892    ///
1893    /// - **Off**: Skips verification entirely
1894    /// - **Soft**: Logs warnings but allows connections
1895    /// - **Hard**: Rejects connections with invalid attestations
1896    ///
1897    /// # Arguments
1898    /// * `peer_id` - The peer's identifier for logging
1899    /// * `peer_entangled_id` - The peer's claimed entangled ID
1900    /// * `peer_public_key` - The peer's ML-DSA public key
1901    ///
1902    /// # Returns
1903    /// An [`EnforcementDecision`] indicating whether to allow or reject the connection.
1904    ///
1905    /// # Example
1906    /// ```rust,ignore
1907    /// let decision = node.verify_peer_attestation(peer_id, &entangled_id, &public_key);
1908    /// if decision.should_reject() {
1909    ///     // Send rejection message and close connection
1910    ///     if let Some(rejection) = decision.rejection() {
1911    ///         send_rejection(peer_id, rejection);
1912    ///     }
1913    ///     disconnect(peer_id);
1914    /// }
1915    /// ```
1916    pub fn verify_peer_attestation(
1917        &self,
1918        peer_id: &str,
1919        peer_entangled_id: &crate::attestation::EntangledId,
1920        peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
1921    ) -> crate::attestation::EnforcementDecision {
1922        use crate::attestation::{
1923            AttestationRejection, AttestationRejectionReason, EnforcementDecision, EnforcementMode,
1924        };
1925
1926        let config = &self.config.attestation_config;
1927
1928        // Skip verification if attestation is disabled
1929        if !config.enabled {
1930            return EnforcementDecision::Skipped;
1931        }
1932
1933        // Verify the entangled ID derivation
1934        let id_valid = peer_entangled_id.verify(peer_public_key);
1935
1936        // Check binary hash allowlist (if configured)
1937        let binary_hash = *peer_entangled_id.binary_hash();
1938        let binary_allowed = config.is_binary_allowed(&binary_hash);
1939
1940        match config.enforcement_mode {
1941            EnforcementMode::Off => EnforcementDecision::Skipped,
1942
1943            EnforcementMode::Soft => {
1944                // Soft enforcement: log warnings but allow connections
1945                if !id_valid {
1946                    warn!(
1947                        peer = %peer_id,
1948                        binary_hash = %hex::encode(&binary_hash[..8]),
1949                        "Peer attestation verification failed: Invalid entangled ID (soft mode - allowing)"
1950                    );
1951                    return EnforcementDecision::AllowWithWarning {
1952                        reason: AttestationRejectionReason::IdentityMismatch,
1953                    };
1954                }
1955                if !binary_allowed {
1956                    warn!(
1957                        peer = %peer_id,
1958                        binary_hash = %hex::encode(binary_hash),
1959                        "Peer attestation verification failed: Binary not in allowlist (soft mode - allowing)"
1960                    );
1961                    return EnforcementDecision::AllowWithWarning {
1962                        reason: AttestationRejectionReason::BinaryNotAllowed { hash: binary_hash },
1963                    };
1964                }
1965                EnforcementDecision::Allow
1966            }
1967
1968            EnforcementMode::Hard => {
1969                // Hard enforcement: reject invalid attestations
1970                if !id_valid {
1971                    error!(
1972                        peer = %peer_id,
1973                        binary_hash = %hex::encode(&binary_hash[..8]),
1974                        "REJECTING peer: Invalid entangled ID derivation"
1975                    );
1976                    return EnforcementDecision::Reject {
1977                        rejection: AttestationRejection::identity_mismatch(),
1978                    };
1979                }
1980                if !binary_allowed {
1981                    error!(
1982                        peer = %peer_id,
1983                        binary_hash = %hex::encode(binary_hash),
1984                        "REJECTING peer: Binary not in allowlist"
1985                    );
1986                    return EnforcementDecision::Reject {
1987                        rejection: AttestationRejection::binary_not_allowed(binary_hash),
1988                    };
1989                }
1990
1991                info!(
1992                    peer = %peer_id,
1993                    entangled_id = %hex::encode(&peer_entangled_id.id()[..8]),
1994                    "Peer attestation verified successfully (hard mode)"
1995                );
1996                EnforcementDecision::Allow
1997            }
1998        }
1999    }
2000
2001    /// Verify a peer's attestation and return a simple boolean result.
2002    ///
2003    /// This is a convenience method that wraps [`verify_peer_attestation`] for cases
2004    /// where only a pass/fail result is needed without the detailed decision.
2005    ///
2006    /// # Returns
2007    /// `true` if the connection should be allowed, `false` if it should be rejected.
2008    #[must_use]
2009    pub fn verify_peer_attestation_simple(
2010        &self,
2011        peer_id: &str,
2012        peer_entangled_id: &crate::attestation::EntangledId,
2013        peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
2014    ) -> bool {
2015        self.verify_peer_attestation(peer_id, peer_entangled_id, peer_public_key)
2016            .should_allow()
2017    }
2018
2019    // MCP removed: all MCP tool/service methods removed
2020
2021    // /// Handle MCP remote tool call with network integration
2022
2023    // /// List tools available on a specific remote peer
2024
2025    // /// Get MCP server statistics
2026
2027    /// Get production resource metrics
2028    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
2029        if let Some(ref resource_manager) = self.resource_manager {
2030            Ok(resource_manager.get_metrics().await)
2031        } else {
2032            Err(P2PError::Network(
2033                crate::error::NetworkError::ProtocolError(
2034                    "Production resource manager not enabled".to_string().into(),
2035                ),
2036            ))
2037        }
2038    }
2039
2040    /// Connection lifecycle monitor task - processes ant-quic connection events
2041    /// and updates active_connections HashSet and peers map.
2042    ///
2043    /// This version accepts a pre-subscribed receiver to avoid the race condition
2044    /// where early connections could be missed if subscription happens after the task starts.
2045    #[allow(clippy::too_many_arguments)]
2046    async fn connection_lifecycle_monitor_with_rx(
2047        dual_node: Arc<DualStackNetworkNode>,
2048        mut event_rx: broadcast::Receiver<crate::transport::ant_quic_adapter::ConnectionEvent>,
2049        active_connections: Arc<RwLock<HashSet<String>>>,
2050        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
2051        event_tx: broadcast::Sender<P2PEvent>,
2052        geo_provider: Arc<BgpGeoProvider>,
2053        _local_peer_id: String,
2054    ) {
2055        use crate::transport::ant_quic_adapter::ConnectionEvent;
2056
2057        info!("Connection lifecycle monitor started (pre-subscribed receiver)");
2058
2059        loop {
2060            match event_rx.recv().await {
2061                Ok(event) => {
2062                    match event {
2063                        ConnectionEvent::Established {
2064                            peer_id,
2065                            remote_address,
2066                        } => {
2067                            let peer_id_str = ant_peer_id_to_string(&peer_id);
2068                            debug!(
2069                                "Connection established: peer={}, addr={}",
2070                                peer_id_str, remote_address
2071                            );
2072
2073                            // **GeoIP Validation**
2074                            let ip = remote_address.ip();
2075                            let is_rejected = match ip {
2076                                std::net::IpAddr::V4(v4) => {
2077                                    if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
2078                                        geo_provider.is_hosting_asn(asn)
2079                                            || geo_provider.is_vpn_asn(asn)
2080                                    } else {
2081                                        false
2082                                    }
2083                                }
2084                                std::net::IpAddr::V6(v6) => {
2085                                    let info = geo_provider.lookup(v6);
2086                                    info.is_hosting_provider || info.is_vpn_provider
2087                                }
2088                            };
2089
2090                            if is_rejected {
2091                                info!(
2092                                    "Rejecting connection from {} ({}) due to GeoIP policy",
2093                                    peer_id_str, remote_address
2094                                );
2095                                // Actively close the QUIC connection instead of
2096                                // leaking it until idle timeout
2097                                dual_node.disconnect_peer(&peer_id).await;
2098                                continue;
2099                            }
2100
2101                            // Add to active connections
2102                            active_connections.write().await.insert(peer_id_str.clone());
2103
2104                            // Update peer info or insert new
2105                            let mut peers_lock = peers.write().await;
2106                            if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
2107                                peer_info.status = ConnectionStatus::Connected;
2108                                peer_info.connected_at = Instant::now();
2109                            } else {
2110                                debug!("Registering new incoming peer: {}", peer_id_str);
2111                                peers_lock.insert(
2112                                    peer_id_str.clone(),
2113                                    PeerInfo {
2114                                        peer_id: peer_id_str.clone(),
2115                                        addresses: vec![remote_address.to_string()],
2116                                        status: ConnectionStatus::Connected,
2117                                        last_seen: Instant::now(),
2118                                        connected_at: Instant::now(),
2119                                        protocols: Vec::new(),
2120                                        heartbeat_count: 0,
2121                                    },
2122                                );
2123                            }
2124
2125                            // Broadcast connection event
2126                            let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
2127                        }
2128                        ConnectionEvent::Lost { peer_id, reason } => {
2129                            let peer_id_str = ant_peer_id_to_string(&peer_id);
2130                            debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
2131
2132                            // Remove from active connections
2133                            active_connections.write().await.remove(&peer_id_str);
2134
2135                            // Update peer info status
2136                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2137                                peer_info.status = ConnectionStatus::Disconnected;
2138                                peer_info.last_seen = Instant::now();
2139                            }
2140
2141                            // Broadcast disconnection event
2142                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2143                        }
2144                        ConnectionEvent::Failed { peer_id, reason } => {
2145                            let peer_id_str = ant_peer_id_to_string(&peer_id);
2146                            debug!("Connection failed: peer={}, reason={}", peer_id_str, reason);
2147
2148                            // Remove from active connections
2149                            active_connections.write().await.remove(&peer_id_str);
2150
2151                            // Update peer info status
2152                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2153                                peer_info.status = ConnectionStatus::Disconnected;
2154                                peer_info.last_seen = Instant::now();
2155                            }
2156
2157                            // Broadcast disconnection event
2158                            let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2159                        }
2160                    }
2161                }
2162                Err(broadcast::error::RecvError::Lagged(skipped)) => {
2163                    warn!(
2164                        "Connection event receiver lagged, skipped {} events",
2165                        skipped
2166                    );
2167                }
2168                Err(broadcast::error::RecvError::Closed) => {
2169                    info!("Connection event channel closed, stopping lifecycle monitor");
2170                    break;
2171                }
2172            }
2173        }
2174    }
2175
2176    /// Start connection monitor (called after node initialization)
2177    async fn start_connection_monitor(&self) {
2178        // The monitor task is already spawned in new() with a temporary peers map
2179        // This method is a placeholder for future enhancements where we might
2180        // need to restart the monitor or provide it with updated references
2181        debug!("Connection monitor already running from initialization");
2182    }
2183
2184    /// Keepalive task - sends periodic pings to prevent 30-second idle timeout
2185    ///
2186    /// ant-quic has a 30-second max_idle_timeout. This task sends a small keepalive
2187    /// message every 15 seconds (half the timeout) to all active connections to prevent
2188    /// them from timing out during periods of inactivity.
2189    async fn keepalive_task(
2190        active_connections: Arc<RwLock<HashSet<String>>>,
2191        dual_node: Arc<DualStackNetworkNode>,
2192        shutdown: Arc<AtomicBool>,
2193    ) {
2194        const KEEPALIVE_INTERVAL_SECS: u64 = 15; // Half of 30-second timeout
2195
2196        let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
2197        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2198
2199        info!(
2200            "Keepalive task started (interval: {}s)",
2201            KEEPALIVE_INTERVAL_SECS
2202        );
2203
2204        loop {
2205            // Check shutdown flag first
2206            if shutdown.load(Ordering::Relaxed) {
2207                info!("Keepalive task shutting down");
2208                break;
2209            }
2210
2211            interval.tick().await;
2212
2213            // Get snapshot of active connections
2214            let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
2215
2216            if peers.is_empty() {
2217                trace!("Keepalive: no active connections");
2218                continue;
2219            }
2220
2221            debug!("Sending keepalive to {} active connections", peers.len());
2222
2223            // Send keepalives concurrently so one slow/stuck peer doesn't
2224            // delay keepalives to all others and cause cascading timeouts.
2225            let futs: Vec<_> = peers
2226                .into_iter()
2227                .map(|peer_id| {
2228                    let node = Arc::clone(&dual_node);
2229                    async move {
2230                        if let Err(e) = node
2231                            .send_to_peer_string_optimized(&peer_id, KEEPALIVE_PAYLOAD)
2232                            .await
2233                        {
2234                            debug!(
2235                                "Failed to send keepalive to peer {}: {} (connection may have closed)",
2236                                peer_id, e
2237                            );
2238                        } else {
2239                            trace!("Keepalive sent to peer: {}", peer_id);
2240                        }
2241                    }
2242                })
2243                .collect();
2244            futures::future::join_all(futs).await;
2245        }
2246
2247        info!("Keepalive task stopped");
2248    }
2249
2250    /// Periodic maintenance task - detects stale peers and removes them
2251    ///
2252    /// This task runs every 100ms and:
2253    /// 1. Detects peers that haven't been seen for longer than stale_threshold
2254    /// 2. Marks them as disconnected and emits PeerDisconnected events
2255    /// 3. Removes fully disconnected peers after cleanup_threshold (2x stale_threshold)
2256    async fn periodic_maintenance_task(
2257        peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2258        active_connections: Arc<RwLock<HashSet<PeerId>>>,
2259        event_tx: broadcast::Sender<P2PEvent>,
2260        stale_threshold: Duration,
2261        shutdown: Arc<AtomicBool>,
2262    ) {
2263        let cleanup_threshold = stale_threshold * 2;
2264        let mut interval = interval(Duration::from_millis(100));
2265        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2266
2267        info!(
2268            "Periodic maintenance task started (stale threshold: {:?})",
2269            stale_threshold
2270        );
2271
2272        loop {
2273            interval.tick().await;
2274
2275            if shutdown.load(Ordering::SeqCst) {
2276                break;
2277            }
2278
2279            let now = Instant::now();
2280            let mut peers_to_remove: Vec<PeerId> = Vec::new();
2281            let mut peers_to_mark_disconnected: Vec<PeerId> = Vec::new();
2282
2283            // Phase 1: Identify stale and disconnected peers
2284            {
2285                let peers_lock = peers.read().await;
2286                for (peer_id, peer_info) in peers_lock.iter() {
2287                    let elapsed = now.duration_since(peer_info.last_seen);
2288
2289                    match &peer_info.status {
2290                        ConnectionStatus::Connected => {
2291                            if elapsed > stale_threshold {
2292                                debug!(
2293                                    peer_id = %peer_id,
2294                                    elapsed_secs = elapsed.as_secs(),
2295                                    "Peer went stale - marking for disconnection"
2296                                );
2297                                peers_to_mark_disconnected.push(peer_id.clone());
2298                            }
2299                        }
2300                        ConnectionStatus::Disconnected | ConnectionStatus::Failed(_) => {
2301                            if elapsed > cleanup_threshold {
2302                                trace!(
2303                                    peer_id = %peer_id,
2304                                    elapsed_secs = elapsed.as_secs(),
2305                                    "Removing disconnected peer from tracking"
2306                                );
2307                                peers_to_remove.push(peer_id.clone());
2308                            }
2309                        }
2310                        ConnectionStatus::Connecting | ConnectionStatus::Disconnecting => {
2311                            if elapsed > stale_threshold {
2312                                debug!(
2313                                    peer_id = %peer_id,
2314                                    status = ?peer_info.status,
2315                                    "Connection timed out in transitional state"
2316                                );
2317                                peers_to_mark_disconnected.push(peer_id.clone());
2318                            }
2319                        }
2320                    }
2321                }
2322            }
2323
2324            // Phase 2: Mark stale peers as disconnected
2325            // Note: We do NOT reset last_seen here. The cleanup timer uses the original
2326            // last_seen timestamp, so disconnected peers are removed after cleanup_threshold
2327            // from when they were last actually seen, not from when they were marked disconnected.
2328            if !peers_to_mark_disconnected.is_empty() {
2329                let mut peers_lock = peers.write().await;
2330                for peer_id in &peers_to_mark_disconnected {
2331                    if let Some(peer_info) = peers_lock.get_mut(peer_id) {
2332                        peer_info.status = ConnectionStatus::Disconnected;
2333                    }
2334                }
2335            }
2336
2337            // Phase 3: Remove from active_connections and emit events
2338            for peer_id in &peers_to_mark_disconnected {
2339                active_connections.write().await.remove(peer_id);
2340                let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2341                info!(peer_id = %peer_id, "Stale peer disconnected");
2342            }
2343
2344            // Phase 4: Remove fully cleaned up peers from tracking
2345            if !peers_to_remove.is_empty() {
2346                let mut peers_lock = peers.write().await;
2347                for peer_id in &peers_to_remove {
2348                    peers_lock.remove(peer_id);
2349                    trace!(peer_id = %peer_id, "Peer removed from tracking");
2350                }
2351            }
2352        }
2353
2354        info!("Periodic maintenance task stopped");
2355    }
2356
2357    /// Check system health
2358    pub async fn health_check(&self) -> Result<()> {
2359        if let Some(ref resource_manager) = self.resource_manager {
2360            resource_manager.health_check().await
2361        } else {
2362            // Basic health check without resource manager
2363            let peer_count = self.peer_count().await;
2364            if peer_count > self.config.max_connections {
2365                Err(P2PError::Network(
2366                    crate::error::NetworkError::ProtocolError(
2367                        format!("Too many connections: {peer_count}").into(),
2368                    ),
2369                ))
2370            } else {
2371                Ok(())
2372            }
2373        }
2374    }
2375
2376    /// Get production configuration (if enabled)
2377    pub fn production_config(&self) -> Option<&ProductionConfig> {
2378        self.config.production_config.as_ref()
2379    }
2380
2381    /// Check if production hardening is enabled
2382    pub fn is_production_mode(&self) -> bool {
2383        self.resource_manager.is_some()
2384    }
2385
2386    /// Get DHT reference
2387    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2388        self.dht.as_ref()
2389    }
2390
2391    /// Store a value in the DHT
2392    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2393        if let Some(ref dht) = self.dht {
2394            let mut dht_instance = dht.write().await;
2395            let dht_key = crate::dht::DhtKey::from_bytes(key);
2396            dht_instance
2397                .store(&dht_key, value.clone())
2398                .await
2399                .map_err(|e| {
2400                    P2PError::Dht(crate::error::DhtError::StoreFailed(
2401                        format!("{:?}: {e}", key).into(),
2402                    ))
2403                })?;
2404
2405            Ok(())
2406        } else {
2407            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2408                "DHT not enabled".to_string().into(),
2409            )))
2410        }
2411    }
2412
2413    /// Retrieve a value from the DHT
2414    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2415        if let Some(ref dht) = self.dht {
2416            let dht_instance = dht.read().await;
2417            let dht_key = crate::dht::DhtKey::from_bytes(key);
2418            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2419                P2PError::Dht(crate::error::DhtError::StoreFailed(
2420                    format!("Retrieve failed: {e}").into(),
2421                ))
2422            })?;
2423
2424            Ok(record_result)
2425        } else {
2426            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2427                "DHT not enabled".to_string().into(),
2428            )))
2429        }
2430    }
2431
2432    /// Add a discovered peer to the bootstrap cache
2433    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2434        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2435            let manager = bootstrap_manager.write().await;
2436            let socket_addresses: Vec<std::net::SocketAddr> = addresses
2437                .iter()
2438                .filter_map(|addr| addr.parse().ok())
2439                .collect();
2440            let contact = ContactEntry::new(peer_id, socket_addresses);
2441            manager.add_contact(contact).await.map_err(|e| {
2442                P2PError::Network(crate::error::NetworkError::ProtocolError(
2443                    format!("Failed to add peer to bootstrap cache: {e}").into(),
2444                ))
2445            })?;
2446        }
2447        Ok(())
2448    }
2449
2450    /// Update connection metrics for a peer in the bootstrap cache
2451    pub async fn update_peer_metrics(
2452        &self,
2453        peer_id: &PeerId,
2454        success: bool,
2455        latency_ms: Option<u64>,
2456        _error: Option<String>,
2457    ) -> Result<()> {
2458        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2459            let manager = bootstrap_manager.write().await;
2460
2461            // Create quality metrics based on the connection result
2462            let metrics = QualityMetrics {
2463                success_rate: if success { 1.0 } else { 0.0 },
2464                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2465                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
2466                last_connection_attempt: chrono::Utc::now(),
2467                last_successful_connection: if success {
2468                    chrono::Utc::now()
2469                } else {
2470                    chrono::Utc::now() - chrono::Duration::hours(1)
2471                },
2472                uptime_score: 0.5,
2473            };
2474
2475            manager
2476                .update_contact_metrics(peer_id, metrics)
2477                .await
2478                .map_err(|e| {
2479                    P2PError::Network(crate::error::NetworkError::ProtocolError(
2480                        format!("Failed to update peer metrics: {e}").into(),
2481                    ))
2482                })?;
2483        }
2484        Ok(())
2485    }
2486
2487    /// Get bootstrap cache statistics
2488    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2489        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2490            let manager = bootstrap_manager.read().await;
2491            let stats = manager.get_stats().await.map_err(|e| {
2492                P2PError::Network(crate::error::NetworkError::ProtocolError(
2493                    format!("Failed to get bootstrap stats: {e}").into(),
2494                ))
2495            })?;
2496            Ok(Some(stats))
2497        } else {
2498            Ok(None)
2499        }
2500    }
2501
2502    /// Get the number of cached bootstrap peers
2503    pub async fn cached_peer_count(&self) -> usize {
2504        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2505            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2506        {
2507            return stats.total_contacts;
2508        }
2509        0
2510    }
2511
2512    /// Connect to bootstrap peers
2513    async fn connect_bootstrap_peers(&self) -> Result<()> {
2514        let mut bootstrap_contacts = Vec::new();
2515        let mut used_cache = false;
2516        let mut seen_addresses = std::collections::HashSet::new();
2517
2518        // CLI-provided bootstrap peers take priority - always include them first
2519        let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2520            self.config.bootstrap_peers_str.clone()
2521        } else {
2522            // Convert Multiaddr to strings
2523            self.config
2524                .bootstrap_peers
2525                .iter()
2526                .map(|addr| addr.to_string())
2527                .collect::<Vec<_>>()
2528        };
2529
2530        if !cli_bootstrap_peers.is_empty() {
2531            info!(
2532                "Using {} CLI-provided bootstrap peers (priority)",
2533                cli_bootstrap_peers.len()
2534            );
2535            for addr in &cli_bootstrap_peers {
2536                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2537                    seen_addresses.insert(socket_addr);
2538                    let contact = ContactEntry::new(
2539                        format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2540                        vec![socket_addr],
2541                    );
2542                    bootstrap_contacts.push(contact);
2543                } else {
2544                    warn!("Invalid bootstrap address format: {}", addr);
2545                }
2546            }
2547        }
2548
2549        // Supplement with cached bootstrap peers (after CLI peers)
2550        // Use QUIC-specific peer selection since we're using ant-quic transport
2551        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2552            let manager = bootstrap_manager.read().await;
2553            match manager.get_quic_bootstrap_peers(20).await {
2554                // Try to get top 20 quality QUIC-enabled peers
2555                Ok(contacts) => {
2556                    if !contacts.is_empty() {
2557                        let mut added_from_cache = 0;
2558                        for contact in contacts {
2559                            // Only add if we haven't already added this address from CLI
2560                            let new_addresses: Vec<_> = contact
2561                                .addresses
2562                                .iter()
2563                                .filter(|addr| !seen_addresses.contains(addr))
2564                                .copied()
2565                                .collect();
2566
2567                            if !new_addresses.is_empty() {
2568                                for addr in &new_addresses {
2569                                    seen_addresses.insert(*addr);
2570                                }
2571                                let mut contact = contact.clone();
2572                                contact.addresses = new_addresses;
2573                                bootstrap_contacts.push(contact);
2574                                added_from_cache += 1;
2575                            }
2576                        }
2577                        if added_from_cache > 0 {
2578                            info!(
2579                                "Added {} cached bootstrap peers (supplementing CLI peers)",
2580                                added_from_cache
2581                            );
2582                            used_cache = true;
2583                        }
2584                    }
2585                }
2586                Err(e) => {
2587                    warn!("Failed to get cached bootstrap peers: {}", e);
2588                }
2589            }
2590        }
2591
2592        if bootstrap_contacts.is_empty() {
2593            info!("No bootstrap peers configured and no cached peers available");
2594            return Ok(());
2595        }
2596
2597        // Connect to bootstrap peers
2598        let mut successful_connections = 0;
2599        for contact in bootstrap_contacts {
2600            for addr in &contact.addresses {
2601                match self.connect_peer(&addr.to_string()).await {
2602                    Ok(peer_id) => {
2603                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2604                        successful_connections += 1;
2605
2606                        // Update bootstrap cache with successful connection
2607                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2608                            let manager = bootstrap_manager.write().await;
2609                            let mut updated_contact = contact.clone();
2610                            updated_contact.peer_id = peer_id.clone();
2611                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
2612
2613                            if let Err(e) = manager.add_contact(updated_contact).await {
2614                                warn!("Failed to update bootstrap cache: {}", e);
2615                            }
2616                        }
2617                        break; // Successfully connected, move to next contact
2618                    }
2619                    Err(e) => {
2620                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2621
2622                        // Update bootstrap cache with failed connection
2623                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2624                            let manager = bootstrap_manager.write().await;
2625                            let mut updated_contact = contact.clone();
2626                            updated_contact.update_connection_result(
2627                                false,
2628                                None,
2629                                Some(e.to_string()),
2630                            );
2631
2632                            if let Err(e) = manager.add_contact(updated_contact).await {
2633                                warn!("Failed to update bootstrap cache: {}", e);
2634                            }
2635                        }
2636                    }
2637                }
2638            }
2639        }
2640
2641        if successful_connections == 0 {
2642            if !used_cache {
2643                warn!("Failed to connect to any bootstrap peers");
2644            }
2645            // Starting a node should not be gated on immediate bootstrap connectivity.
2646            // Keep running and allow background discovery / retries to populate peers later.
2647            return Ok(());
2648        }
2649        info!(
2650            "Successfully connected to {} bootstrap peers",
2651            successful_connections
2652        );
2653
2654        Ok(())
2655    }
2656
2657    /// Disconnect from all peers
2658    async fn disconnect_all_peers(&self) -> Result<()> {
2659        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2660
2661        for peer_id in peer_ids {
2662            self.disconnect_peer(&peer_id).await?;
2663        }
2664
2665        Ok(())
2666    }
2667
2668    /// Perform periodic maintenance tasks
2669    ///
2670    /// This function is called periodically (every 100ms) to:
2671    /// 1. Detect and remove stale peers (no activity for configured threshold)
2672    /// 2. Clean up disconnected peers from tracking structures
2673    /// 3. Emit PeerDisconnected events for removed peers
2674    async fn periodic_tasks(&self) -> Result<()> {
2675        // Stale peer threshold from config (default 60s, can be reduced for tests)
2676        let stale_threshold = self.config.stale_peer_threshold;
2677        // Cleanup threshold - disconnected peers are removed after 2x the stale threshold
2678        let cleanup_threshold = stale_threshold * 2;
2679
2680        let now = Instant::now();
2681        let mut peers_to_remove: Vec<PeerId> = Vec::new();
2682        let mut peers_to_mark_disconnected: Vec<PeerId> = Vec::new();
2683
2684        // Phase 1: Identify stale and disconnected peers
2685        {
2686            let peers = self.peers.read().await;
2687            for (peer_id, peer_info) in peers.iter() {
2688                let elapsed = now.duration_since(peer_info.last_seen);
2689
2690                match &peer_info.status {
2691                    ConnectionStatus::Connected => {
2692                        // Check if connected peer has gone stale
2693                        if elapsed > stale_threshold {
2694                            debug!(
2695                                peer_id = %peer_id,
2696                                elapsed_secs = elapsed.as_secs(),
2697                                "Peer went stale - marking for disconnection"
2698                            );
2699                            peers_to_mark_disconnected.push(peer_id.clone());
2700                        }
2701                    }
2702                    ConnectionStatus::Disconnected | ConnectionStatus::Failed(_) => {
2703                        // Remove disconnected/failed peers after cleanup threshold
2704                        if elapsed > cleanup_threshold {
2705                            trace!(
2706                                peer_id = %peer_id,
2707                                elapsed_secs = elapsed.as_secs(),
2708                                "Removing disconnected peer from tracking"
2709                            );
2710                            peers_to_remove.push(peer_id.clone());
2711                        }
2712                    }
2713                    ConnectionStatus::Connecting | ConnectionStatus::Disconnecting => {
2714                        // Transitional states - check for timeout
2715                        if elapsed > stale_threshold {
2716                            debug!(
2717                                peer_id = %peer_id,
2718                                status = ?peer_info.status,
2719                                "Connection timed out in transitional state"
2720                            );
2721                            peers_to_mark_disconnected.push(peer_id.clone());
2722                        }
2723                    }
2724                }
2725            }
2726        }
2727
2728        // Phase 2: Mark stale peers as disconnected
2729        if !peers_to_mark_disconnected.is_empty() {
2730            let mut peers = self.peers.write().await;
2731            for peer_id in &peers_to_mark_disconnected {
2732                if let Some(peer_info) = peers.get_mut(peer_id) {
2733                    peer_info.status = ConnectionStatus::Disconnected;
2734                    peer_info.last_seen = now; // Reset for cleanup timer
2735                }
2736            }
2737        }
2738
2739        // Phase 3: Remove from active_connections and emit events for disconnected peers
2740        for peer_id in &peers_to_mark_disconnected {
2741            // Remove from active connections set
2742            self.active_connections.write().await.remove(peer_id);
2743
2744            // Broadcast disconnection event
2745            let _ = self
2746                .event_tx
2747                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
2748
2749            info!(
2750                peer_id = %peer_id,
2751                "Stale peer disconnected"
2752            );
2753        }
2754
2755        // Phase 4: Remove fully cleaned up peers from tracking
2756        if !peers_to_remove.is_empty() {
2757            let mut peers = self.peers.write().await;
2758            for peer_id in &peers_to_remove {
2759                peers.remove(peer_id);
2760                trace!(peer_id = %peer_id, "Peer removed from tracking");
2761            }
2762        }
2763
2764        Ok(())
2765    }
2766}
2767
2768/// Network sender trait for sending messages
2769#[async_trait::async_trait]
2770pub trait NetworkSender: Send + Sync {
2771    /// Send a message to a specific peer
2772    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2773
2774    /// Get our local peer ID
2775    fn local_peer_id(&self) -> &PeerId;
2776}
2777
2778/// Lightweight wrapper for P2PNode to implement NetworkSender
2779#[derive(Clone)]
2780pub struct P2PNetworkSender {
2781    peer_id: PeerId,
2782    // Use channels for async communication with the P2P node
2783    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2784}
2785
2786impl P2PNetworkSender {
2787    pub fn new(
2788        peer_id: PeerId,
2789        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2790    ) -> Self {
2791        Self { peer_id, send_tx }
2792    }
2793}
2794
2795/// Implementation of NetworkSender trait for P2PNetworkSender
2796#[async_trait::async_trait]
2797impl NetworkSender for P2PNetworkSender {
2798    /// Send a message to a specific peer via the P2P network
2799    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2800        self.send_tx
2801            .send((peer_id.clone(), protocol.to_string(), data))
2802            .map_err(|_| {
2803                P2PError::Network(crate::error::NetworkError::ProtocolError(
2804                    "Failed to send message via channel".to_string().into(),
2805                ))
2806            })?;
2807        Ok(())
2808    }
2809
2810    /// Get our local peer ID
2811    fn local_peer_id(&self) -> &PeerId {
2812        &self.peer_id
2813    }
2814}
2815
2816/// Builder pattern for creating P2P nodes
2817pub struct NodeBuilder {
2818    config: NodeConfig,
2819}
2820
2821impl Default for NodeBuilder {
2822    fn default() -> Self {
2823        Self::new()
2824    }
2825}
2826
2827impl NodeBuilder {
2828    /// Create a new node builder
2829    pub fn new() -> Self {
2830        Self {
2831            config: NodeConfig::default(),
2832        }
2833    }
2834
2835    /// Set the peer ID
2836    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2837        self.config.peer_id = Some(peer_id);
2838        self
2839    }
2840
2841    /// Add a listen address
2842    pub fn listen_on(mut self, addr: &str) -> Self {
2843        if let Ok(multiaddr) = addr.parse() {
2844            self.config.listen_addrs.push(multiaddr);
2845        }
2846        self
2847    }
2848
2849    /// Add a bootstrap peer
2850    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2851        if let Ok(multiaddr) = addr.parse() {
2852            self.config.bootstrap_peers.push(multiaddr);
2853        }
2854        self.config.bootstrap_peers_str.push(addr.to_string());
2855        self
2856    }
2857
2858    /// Enable IPv6 support
2859    pub fn with_ipv6(mut self, enable: bool) -> Self {
2860        self.config.enable_ipv6 = enable;
2861        self
2862    }
2863
2864    // MCP removed: builder methods deleted
2865
2866    /// Set connection timeout
2867    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2868        self.config.connection_timeout = timeout;
2869        self
2870    }
2871
2872    /// Set maximum connections
2873    pub fn with_max_connections(mut self, max: usize) -> Self {
2874        self.config.max_connections = max;
2875        self
2876    }
2877
2878    /// Enable production mode with default configuration
2879    pub fn with_production_mode(mut self) -> Self {
2880        self.config.production_config = Some(ProductionConfig::default());
2881        self
2882    }
2883
2884    /// Configure production settings
2885    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2886        self.config.production_config = Some(production_config);
2887        self
2888    }
2889
2890    /// Configure IP diversity limits for Sybil protection.
2891    pub fn with_diversity_config(
2892        mut self,
2893        diversity_config: crate::security::IPDiversityConfig,
2894    ) -> Self {
2895        self.config.diversity_config = Some(diversity_config);
2896        self
2897    }
2898
2899    /// Configure DHT settings
2900    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2901        self.config.dht_config = dht_config;
2902        self
2903    }
2904
2905    /// Enable DHT with default configuration
2906    pub fn with_default_dht(mut self) -> Self {
2907        self.config.dht_config = DHTConfig::default();
2908        self
2909    }
2910
2911    /// Build the P2P node
2912    pub async fn build(self) -> Result<P2PNode> {
2913        P2PNode::new(self.config).await
2914    }
2915}
2916
2917#[cfg(test)]
2918#[allow(clippy::unwrap_used, clippy::expect_used)]
2919mod diversity_tests {
2920    use super::*;
2921    use crate::security::IPDiversityConfig;
2922
2923    async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2924        let diversity_config = config.diversity_config.clone().unwrap_or_default();
2925        // Use a temp dir to avoid conflicts with cached files from old format
2926        let temp_dir = tempfile::TempDir::new().expect("temp dir");
2927        let mut cache_config = config.bootstrap_cache_config.clone().unwrap_or_default();
2928        cache_config.cache_dir = temp_dir.path().to_path_buf();
2929
2930        BootstrapManager::with_full_config(
2931            cache_config,
2932            crate::rate_limit::JoinRateLimiterConfig::default(),
2933            diversity_config,
2934        )
2935        .await
2936        .expect("bootstrap manager")
2937    }
2938
2939    #[tokio::test]
2940    async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2941        let config = NodeConfig {
2942            diversity_config: Some(IPDiversityConfig::testnet()),
2943            ..Default::default()
2944        };
2945
2946        let manager = build_bootstrap_manager_like_prod(&config).await;
2947        assert!(manager.diversity_config().is_relaxed());
2948        assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
2949    }
2950}
2951
2952/// Helper function to register a new peer
2953async fn register_new_peer(
2954    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2955    peer_id: &PeerId,
2956    remote_addr: &NetworkAddress,
2957) {
2958    let mut peers_guard = peers.write().await;
2959    let peer_info = PeerInfo {
2960        peer_id: peer_id.clone(),
2961        addresses: vec![remote_addr.to_string()],
2962        connected_at: tokio::time::Instant::now(),
2963        last_seen: tokio::time::Instant::now(),
2964        status: ConnectionStatus::Connected,
2965        protocols: vec!["p2p-chat/1.0.0".to_string()],
2966        heartbeat_count: 0,
2967    };
2968    peers_guard.insert(peer_id.clone(), peer_info);
2969}
2970
2971#[cfg(test)]
2972mod tests {
2973    use super::*;
2974    // MCP removed from tests
2975    use std::time::Duration;
2976    use tokio::time::timeout;
2977
2978    // Test tool handler for network tests
2979
2980    // MCP removed
2981
2982    /// Helper function to create a test node configuration
2983    fn create_test_node_config() -> NodeConfig {
2984        NodeConfig {
2985            peer_id: Some("test_peer_123".to_string()),
2986            listen_addrs: vec![
2987                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2988                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2989            ],
2990            listen_addr: std::net::SocketAddr::new(
2991                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2992                0,
2993            ),
2994            bootstrap_peers: vec![],
2995            bootstrap_peers_str: vec![],
2996            enable_ipv6: true,
2997
2998            connection_timeout: Duration::from_secs(2),
2999            keep_alive_interval: Duration::from_secs(30),
3000            max_connections: 100,
3001            max_incoming_connections: 50,
3002            dht_config: DHTConfig::default(),
3003            security_config: SecurityConfig::default(),
3004            production_config: None,
3005            bootstrap_cache_config: None,
3006            diversity_config: None,
3007            attestation_config: crate::attestation::AttestationConfig::default(),
3008            stale_peer_threshold: default_stale_peer_threshold(),
3009        }
3010    }
3011
3012    /// Helper function to create a test tool
3013    // MCP removed: test tool helper deleted
3014
3015    #[tokio::test]
3016    async fn test_node_config_default() {
3017        let config = NodeConfig::default();
3018
3019        assert!(config.peer_id.is_none());
3020        assert_eq!(config.listen_addrs.len(), 2);
3021        assert!(config.enable_ipv6);
3022        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
3023        assert_eq!(config.max_incoming_connections, 100);
3024        assert_eq!(config.connection_timeout, Duration::from_secs(30));
3025    }
3026
3027    #[tokio::test]
3028    async fn test_dht_config_default() {
3029        let config = DHTConfig::default();
3030
3031        assert_eq!(config.k_value, 20);
3032        assert_eq!(config.alpha_value, 5);
3033        assert_eq!(config.record_ttl, Duration::from_secs(3600));
3034        assert_eq!(config.refresh_interval, Duration::from_secs(600));
3035    }
3036
3037    #[tokio::test]
3038    async fn test_security_config_default() {
3039        let config = SecurityConfig::default();
3040
3041        assert!(config.enable_noise);
3042        assert!(config.enable_tls);
3043        assert_eq!(config.trust_level, TrustLevel::Basic);
3044    }
3045
3046    #[test]
3047    fn test_trust_level_variants() {
3048        // Test that all trust level variants can be created
3049        let _none = TrustLevel::None;
3050        let _basic = TrustLevel::Basic;
3051        let _full = TrustLevel::Full;
3052
3053        // Test equality
3054        assert_eq!(TrustLevel::None, TrustLevel::None);
3055        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
3056        assert_eq!(TrustLevel::Full, TrustLevel::Full);
3057        assert_ne!(TrustLevel::None, TrustLevel::Basic);
3058    }
3059
3060    #[test]
3061    fn test_connection_status_variants() {
3062        let connecting = ConnectionStatus::Connecting;
3063        let connected = ConnectionStatus::Connected;
3064        let disconnecting = ConnectionStatus::Disconnecting;
3065        let disconnected = ConnectionStatus::Disconnected;
3066        let failed = ConnectionStatus::Failed("test error".to_string());
3067
3068        assert_eq!(connecting, ConnectionStatus::Connecting);
3069        assert_eq!(connected, ConnectionStatus::Connected);
3070        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
3071        assert_eq!(disconnected, ConnectionStatus::Disconnected);
3072        assert_ne!(connecting, connected);
3073
3074        if let ConnectionStatus::Failed(msg) = failed {
3075            assert_eq!(msg, "test error");
3076        } else {
3077            panic!("Expected Failed status");
3078        }
3079    }
3080
3081    #[tokio::test]
3082    async fn test_node_creation() -> Result<()> {
3083        let config = create_test_node_config();
3084        let node = P2PNode::new(config).await?;
3085
3086        assert_eq!(node.peer_id(), "test_peer_123");
3087        assert!(!node.is_running().await);
3088        assert_eq!(node.peer_count().await, 0);
3089        assert!(node.connected_peers().await.is_empty());
3090
3091        Ok(())
3092    }
3093
3094    #[tokio::test]
3095    async fn test_node_creation_without_peer_id() -> Result<()> {
3096        let mut config = create_test_node_config();
3097        config.peer_id = None;
3098
3099        let node = P2PNode::new(config).await?;
3100
3101        // Should have generated a peer ID
3102        assert!(node.peer_id().starts_with("peer_"));
3103        assert!(!node.is_running().await);
3104
3105        Ok(())
3106    }
3107
3108    #[tokio::test]
3109    async fn test_node_lifecycle() -> Result<()> {
3110        let config = create_test_node_config();
3111        let node = P2PNode::new(config).await?;
3112
3113        // Initially not running
3114        assert!(!node.is_running().await);
3115
3116        // Start the node
3117        node.start().await?;
3118        assert!(node.is_running().await);
3119
3120        // Check listen addresses were set (at least one)
3121        let listen_addrs = node.listen_addrs().await;
3122        assert!(
3123            !listen_addrs.is_empty(),
3124            "Expected at least one listening address"
3125        );
3126
3127        // Stop the node
3128        node.stop().await?;
3129        assert!(!node.is_running().await);
3130
3131        Ok(())
3132    }
3133
3134    #[tokio::test]
3135    async fn test_peer_connection() -> Result<()> {
3136        let config1 = create_test_node_config();
3137        let mut config2 = create_test_node_config();
3138        config2.peer_id = Some("test_peer_456".to_string());
3139
3140        let node1 = P2PNode::new(config1).await?;
3141        let node2 = P2PNode::new(config2).await?;
3142
3143        node1.start().await?;
3144        node2.start().await?;
3145
3146        let node2_addr = node2
3147            .listen_addrs()
3148            .await
3149            .into_iter()
3150            .find(|a| a.ip().is_ipv4())
3151            .ok_or_else(|| {
3152                P2PError::Network(crate::error::NetworkError::InvalidAddress(
3153                    "Node 2 did not expose an IPv4 listen address".into(),
3154                ))
3155            })?;
3156
3157        // Connect to a real peer
3158        let peer_id = node1.connect_peer(&node2_addr.to_string()).await?;
3159
3160        // Check peer count
3161        assert_eq!(node1.peer_count().await, 1);
3162
3163        // Check connected peers
3164        let connected_peers = node1.connected_peers().await;
3165        assert_eq!(connected_peers.len(), 1);
3166        assert_eq!(connected_peers[0], peer_id);
3167
3168        // Get peer info
3169        let peer_info = node1.peer_info(&peer_id).await;
3170        assert!(peer_info.is_some());
3171        let info = peer_info.expect("Peer info should exist after adding peer");
3172        assert_eq!(info.peer_id, peer_id);
3173        assert_eq!(info.status, ConnectionStatus::Connected);
3174        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3175
3176        // Disconnect from peer
3177        node1.disconnect_peer(&peer_id).await?;
3178        assert_eq!(node1.peer_count().await, 0);
3179
3180        node1.stop().await?;
3181        node2.stop().await?;
3182
3183        Ok(())
3184    }
3185
3186    // TODO(windows): Investigate QUIC connection issues on Windows CI
3187    // This test consistently fails on Windows GitHub Actions runners with
3188    // "All connect attempts failed" even with IPv4-only config, long delays,
3189    // and multiple retry attempts. The underlying ant-quic library may have
3190    // issues on Windows that need investigation.
3191    // See: https://github.com/dirvine/saorsa-core/issues/TBD
3192    #[cfg_attr(target_os = "windows", ignore)]
3193    #[tokio::test]
3194    async fn test_event_subscription() -> Result<()> {
3195        // Configure both nodes to use only IPv4 for reliable cross-platform testing
3196        // This is important because:
3197        // 1. local_addr() returns the first address from listen_addrs
3198        // 2. The default config puts IPv6 first, which may not work on all Windows setups
3199        let ipv4_localhost =
3200            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3201
3202        let mut config1 = create_test_node_config();
3203        config1.listen_addr = ipv4_localhost;
3204        config1.listen_addrs = vec![ipv4_localhost];
3205        config1.enable_ipv6 = false;
3206
3207        let mut config2 = create_test_node_config();
3208        config2.peer_id = Some("test_peer_456".to_string());
3209        config2.listen_addr = ipv4_localhost;
3210        config2.listen_addrs = vec![ipv4_localhost];
3211        config2.enable_ipv6 = false;
3212
3213        let node1 = P2PNode::new(config1).await?;
3214        let node2 = P2PNode::new(config2).await?;
3215
3216        node1.start().await?;
3217        node2.start().await?;
3218
3219        // Wait for nodes to fully bind their listening sockets
3220        // Windows network stack initialization can be significantly slower
3221        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
3222
3223        let mut events = node1.subscribe_events();
3224
3225        // Get the actual listening address using local_addr() for reliability
3226        let node2_addr = node2.local_addr().ok_or_else(|| {
3227            P2PError::Network(crate::error::NetworkError::ProtocolError(
3228                "No listening address".to_string().into(),
3229            ))
3230        })?;
3231
3232        // Connect to a peer with retry logic for Windows reliability
3233        // The QUIC library may need additional time to fully initialize
3234        let mut peer_id = None;
3235        for attempt in 0..3 {
3236            if attempt > 0 {
3237                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3238            }
3239            match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
3240                Ok(Ok(id)) => {
3241                    peer_id = Some(id);
3242                    break;
3243                }
3244                Ok(Err(_)) | Err(_) => continue,
3245            }
3246        }
3247        let peer_id = peer_id.ok_or_else(|| {
3248            P2PError::Network(crate::error::NetworkError::ProtocolError(
3249                "Failed to connect after 3 attempts".to_string().into(),
3250            ))
3251        })?;
3252
3253        // Check for PeerConnected event
3254        let event = timeout(Duration::from_secs(2), events.recv()).await;
3255        assert!(event.is_ok());
3256
3257        let event_result = event
3258            .expect("Should receive event")
3259            .expect("Event should not be error");
3260        match event_result {
3261            P2PEvent::PeerConnected(event_peer_id) => {
3262                assert_eq!(event_peer_id, peer_id);
3263            }
3264            _ => panic!("Expected PeerConnected event"),
3265        }
3266
3267        // Disconnect from peer (this should emit another event)
3268        node1.disconnect_peer(&peer_id).await?;
3269
3270        // Check for PeerDisconnected event
3271        let event = timeout(Duration::from_secs(2), events.recv()).await;
3272        assert!(event.is_ok());
3273
3274        let event_result = event
3275            .expect("Should receive event")
3276            .expect("Event should not be error");
3277        match event_result {
3278            P2PEvent::PeerDisconnected(event_peer_id) => {
3279                assert_eq!(event_peer_id, peer_id);
3280            }
3281            _ => panic!("Expected PeerDisconnected event"),
3282        }
3283
3284        node1.stop().await?;
3285        node2.stop().await?;
3286
3287        Ok(())
3288    }
3289
3290    // TODO(windows): Same QUIC connection issues as test_event_subscription
3291    #[cfg_attr(target_os = "windows", ignore)]
3292    #[tokio::test]
3293    async fn test_message_sending() -> Result<()> {
3294        // Create two nodes
3295        let mut config1 = create_test_node_config();
3296        config1.listen_addr =
3297            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3298        let node1 = P2PNode::new(config1).await?;
3299        node1.start().await?;
3300
3301        let mut config2 = create_test_node_config();
3302        config2.listen_addr =
3303            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3304        let node2 = P2PNode::new(config2).await?;
3305        node2.start().await?;
3306
3307        // Wait a bit for nodes to start listening
3308        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
3309
3310        // Get actual listening address of node2
3311        let node2_addr = node2.local_addr().ok_or_else(|| {
3312            P2PError::Network(crate::error::NetworkError::ProtocolError(
3313                "No listening address".to_string().into(),
3314            ))
3315        })?;
3316
3317        // Connect node1 to node2
3318        let peer_id =
3319            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
3320                Ok(res) => res?,
3321                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3322            };
3323
3324        // Wait a bit for connection to establish
3325        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
3326
3327        // Send a message
3328        let message_data = b"Hello, peer!".to_vec();
3329        let result = match timeout(
3330            Duration::from_millis(500),
3331            node1.send_message(&peer_id, "test-protocol", message_data),
3332        )
3333        .await
3334        {
3335            Ok(res) => res,
3336            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3337        };
3338        // For now, we'll just check that we don't get a "not connected" error
3339        // The actual send might fail due to no handler on the other side
3340        if let Err(e) = &result {
3341            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3342        }
3343
3344        // Try to send to non-existent peer
3345        let non_existent_peer = "non_existent_peer".to_string();
3346        let result = node1
3347            .send_message(&non_existent_peer, "test-protocol", vec![])
3348            .await;
3349        assert!(result.is_err(), "Sending to non-existent peer should fail");
3350
3351        Ok(())
3352    }
3353
3354    #[tokio::test]
3355    async fn test_remote_mcp_operations() -> Result<()> {
3356        let config = create_test_node_config();
3357        let node = P2PNode::new(config).await?;
3358
3359        // MCP removed; test reduced to simple start/stop
3360        node.start().await?;
3361        node.stop().await?;
3362        Ok(())
3363    }
3364
3365    #[tokio::test]
3366    async fn test_health_check() -> Result<()> {
3367        let config = create_test_node_config();
3368        let node = P2PNode::new(config).await?;
3369
3370        // Health check should pass with no connections
3371        let result = node.health_check().await;
3372        assert!(result.is_ok());
3373
3374        // Note: We're not actually connecting to real peers here
3375        // since that would require running bootstrap nodes.
3376        // The health check should still pass with no connections.
3377
3378        Ok(())
3379    }
3380
3381    #[tokio::test]
3382    async fn test_node_uptime() -> Result<()> {
3383        let config = create_test_node_config();
3384        let node = P2PNode::new(config).await?;
3385
3386        let uptime1 = node.uptime();
3387        assert!(uptime1 >= Duration::from_secs(0));
3388
3389        // Wait a bit
3390        tokio::time::sleep(Duration::from_millis(10)).await;
3391
3392        let uptime2 = node.uptime();
3393        assert!(uptime2 > uptime1);
3394
3395        Ok(())
3396    }
3397
3398    #[tokio::test]
3399    async fn test_node_config_access() -> Result<()> {
3400        let config = create_test_node_config();
3401        let expected_peer_id = config.peer_id.clone();
3402        let node = P2PNode::new(config).await?;
3403
3404        let node_config = node.config();
3405        assert_eq!(node_config.peer_id, expected_peer_id);
3406        assert_eq!(node_config.max_connections, 100);
3407        // MCP removed
3408
3409        Ok(())
3410    }
3411
3412    #[tokio::test]
3413    async fn test_mcp_server_access() -> Result<()> {
3414        let config = create_test_node_config();
3415        let _node = P2PNode::new(config).await?;
3416
3417        // MCP removed
3418        Ok(())
3419    }
3420
3421    #[tokio::test]
3422    async fn test_dht_access() -> Result<()> {
3423        let config = create_test_node_config();
3424        let node = P2PNode::new(config).await?;
3425
3426        // Should have DHT
3427        assert!(node.dht().is_some());
3428
3429        Ok(())
3430    }
3431
3432    #[tokio::test]
3433    async fn test_node_builder() -> Result<()> {
3434        // Create a config using the builder but don't actually build a real node
3435        let builder = P2PNode::builder()
3436            .with_peer_id("builder_test_peer".to_string())
3437            .listen_on("/ip4/127.0.0.1/tcp/0")
3438            .listen_on("/ip6/::1/tcp/0")
3439            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
3440            .with_ipv6(true)
3441            .with_connection_timeout(Duration::from_secs(15))
3442            .with_max_connections(200);
3443
3444        // Test the configuration that was built
3445        let config = builder.config;
3446        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3447        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
3448        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
3449        assert!(config.enable_ipv6);
3450        assert_eq!(config.connection_timeout, Duration::from_secs(15));
3451        assert_eq!(config.max_connections, 200);
3452
3453        Ok(())
3454    }
3455
3456    #[tokio::test]
3457    async fn test_bootstrap_peers() -> Result<()> {
3458        let mut config = create_test_node_config();
3459        config.bootstrap_peers = vec![
3460            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3461            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3462        ];
3463
3464        let node = P2PNode::new(config).await?;
3465
3466        // Start node (which attempts to connect to bootstrap peers)
3467        node.start().await?;
3468
3469        // In a test environment, bootstrap peers may not be available
3470        // The test verifies the node starts correctly with bootstrap configuration
3471        // Peer count may include local/internal tracking, so we just verify it's reasonable
3472        let _peer_count = node.peer_count().await;
3473
3474        node.stop().await?;
3475        Ok(())
3476    }
3477
3478    #[tokio::test]
3479    async fn test_production_mode_disabled() -> Result<()> {
3480        let config = create_test_node_config();
3481        let node = P2PNode::new(config).await?;
3482
3483        assert!(!node.is_production_mode());
3484        assert!(node.production_config().is_none());
3485
3486        // Resource metrics should fail when production mode is disabled
3487        let result = node.resource_metrics().await;
3488        assert!(result.is_err());
3489        assert!(result.unwrap_err().to_string().contains("not enabled"));
3490
3491        Ok(())
3492    }
3493
3494    #[tokio::test]
3495    async fn test_network_event_variants() {
3496        // Test that all network event variants can be created
3497        let peer_id = "test_peer".to_string();
3498        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3499
3500        let _peer_connected = NetworkEvent::PeerConnected {
3501            peer_id: peer_id.clone(),
3502            addresses: vec![address.clone()],
3503        };
3504
3505        let _peer_disconnected = NetworkEvent::PeerDisconnected {
3506            peer_id: peer_id.clone(),
3507            reason: "test disconnect".to_string(),
3508        };
3509
3510        let _message_received = NetworkEvent::MessageReceived {
3511            peer_id: peer_id.clone(),
3512            protocol: "test-protocol".to_string(),
3513            data: vec![1, 2, 3],
3514        };
3515
3516        let _connection_failed = NetworkEvent::ConnectionFailed {
3517            peer_id: Some(peer_id.clone()),
3518            address: address.clone(),
3519            error: "connection refused".to_string(),
3520        };
3521
3522        let _dht_stored = NetworkEvent::DHTRecordStored {
3523            key: vec![1, 2, 3],
3524            value: vec![4, 5, 6],
3525        };
3526
3527        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3528            key: vec![1, 2, 3],
3529            value: Some(vec![4, 5, 6]),
3530        };
3531    }
3532
3533    #[tokio::test]
3534    async fn test_peer_info_structure() {
3535        let peer_info = PeerInfo {
3536            peer_id: "test_peer".to_string(),
3537            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3538            connected_at: Instant::now(),
3539            last_seen: Instant::now(),
3540            status: ConnectionStatus::Connected,
3541            protocols: vec!["test-protocol".to_string()],
3542            heartbeat_count: 0,
3543        };
3544
3545        assert_eq!(peer_info.peer_id, "test_peer");
3546        assert_eq!(peer_info.addresses.len(), 1);
3547        assert_eq!(peer_info.status, ConnectionStatus::Connected);
3548        assert_eq!(peer_info.protocols.len(), 1);
3549    }
3550
3551    #[tokio::test]
3552    async fn test_serialization() -> Result<()> {
3553        // Test that configs can be serialized/deserialized
3554        let config = create_test_node_config();
3555        let serialized = serde_json::to_string(&config)?;
3556        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3557
3558        assert_eq!(config.peer_id, deserialized.peer_id);
3559        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3560        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3561
3562        Ok(())
3563    }
3564
3565    #[tokio::test]
3566    async fn test_get_peer_id_by_address_found() -> Result<()> {
3567        let config = create_test_node_config();
3568        let node = P2PNode::new(config).await?;
3569
3570        // Manually insert a peer for testing
3571        let test_peer_id = "peer_test_123".to_string();
3572        let test_address = "192.168.1.100:9000".to_string();
3573
3574        let peer_info = PeerInfo {
3575            peer_id: test_peer_id.clone(),
3576            addresses: vec![test_address.clone()],
3577            connected_at: Instant::now(),
3578            last_seen: Instant::now(),
3579            status: ConnectionStatus::Connected,
3580            protocols: vec!["test-protocol".to_string()],
3581            heartbeat_count: 0,
3582        };
3583
3584        node.peers
3585            .write()
3586            .await
3587            .insert(test_peer_id.clone(), peer_info);
3588
3589        // Test: Find peer by address
3590        let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3591        assert_eq!(found_peer_id, Some(test_peer_id));
3592
3593        Ok(())
3594    }
3595
3596    #[tokio::test]
3597    async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3598        let config = create_test_node_config();
3599        let node = P2PNode::new(config).await?;
3600
3601        // Test: Try to find a peer that doesn't exist
3602        let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3603        assert_eq!(result, None);
3604
3605        Ok(())
3606    }
3607
3608    #[tokio::test]
3609    async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3610        let config = create_test_node_config();
3611        let node = P2PNode::new(config).await?;
3612
3613        // Test: Invalid address format should return None
3614        let result = node.get_peer_id_by_address("invalid-address").await;
3615        assert_eq!(result, None);
3616
3617        Ok(())
3618    }
3619
3620    #[tokio::test]
3621    async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3622        let config = create_test_node_config();
3623        let node = P2PNode::new(config).await?;
3624
3625        // Add multiple peers with different addresses
3626        let peer1_id = "peer_1".to_string();
3627        let peer1_addr = "192.168.1.101:9001".to_string();
3628
3629        let peer2_id = "peer_2".to_string();
3630        let peer2_addr = "192.168.1.102:9002".to_string();
3631
3632        let peer1_info = PeerInfo {
3633            peer_id: peer1_id.clone(),
3634            addresses: vec![peer1_addr.clone()],
3635            connected_at: Instant::now(),
3636            last_seen: Instant::now(),
3637            status: ConnectionStatus::Connected,
3638            protocols: vec!["test-protocol".to_string()],
3639            heartbeat_count: 0,
3640        };
3641
3642        let peer2_info = PeerInfo {
3643            peer_id: peer2_id.clone(),
3644            addresses: vec![peer2_addr.clone()],
3645            connected_at: Instant::now(),
3646            last_seen: Instant::now(),
3647            status: ConnectionStatus::Connected,
3648            protocols: vec!["test-protocol".to_string()],
3649            heartbeat_count: 0,
3650        };
3651
3652        node.peers
3653            .write()
3654            .await
3655            .insert(peer1_id.clone(), peer1_info);
3656        node.peers
3657            .write()
3658            .await
3659            .insert(peer2_id.clone(), peer2_info);
3660
3661        // Test: Find each peer by their unique address
3662        let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3663        let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3664
3665        assert_eq!(found_peer1, Some(peer1_id));
3666        assert_eq!(found_peer2, Some(peer2_id));
3667
3668        Ok(())
3669    }
3670
3671    #[tokio::test]
3672    async fn test_list_active_connections_empty() -> Result<()> {
3673        let config = create_test_node_config();
3674        let node = P2PNode::new(config).await?;
3675
3676        // Test: No connections initially
3677        let connections = node.list_active_connections().await;
3678        assert!(connections.is_empty());
3679
3680        Ok(())
3681    }
3682
3683    #[tokio::test]
3684    async fn test_list_active_connections_with_peers() -> Result<()> {
3685        let config = create_test_node_config();
3686        let node = P2PNode::new(config).await?;
3687
3688        // Add multiple peers
3689        let peer1_id = "peer_1".to_string();
3690        let peer1_addrs = vec![
3691            "192.168.1.101:9001".to_string(),
3692            "192.168.1.101:9002".to_string(),
3693        ];
3694
3695        let peer2_id = "peer_2".to_string();
3696        let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3697
3698        let peer1_info = PeerInfo {
3699            peer_id: peer1_id.clone(),
3700            addresses: peer1_addrs.clone(),
3701            connected_at: Instant::now(),
3702            last_seen: Instant::now(),
3703            status: ConnectionStatus::Connected,
3704            protocols: vec!["test-protocol".to_string()],
3705            heartbeat_count: 0,
3706        };
3707
3708        let peer2_info = PeerInfo {
3709            peer_id: peer2_id.clone(),
3710            addresses: peer2_addrs.clone(),
3711            connected_at: Instant::now(),
3712            last_seen: Instant::now(),
3713            status: ConnectionStatus::Connected,
3714            protocols: vec!["test-protocol".to_string()],
3715            heartbeat_count: 0,
3716        };
3717
3718        node.peers
3719            .write()
3720            .await
3721            .insert(peer1_id.clone(), peer1_info);
3722        node.peers
3723            .write()
3724            .await
3725            .insert(peer2_id.clone(), peer2_info);
3726
3727        // Also add to active_connections (list_active_connections iterates over this)
3728        node.active_connections
3729            .write()
3730            .await
3731            .insert(peer1_id.clone());
3732        node.active_connections
3733            .write()
3734            .await
3735            .insert(peer2_id.clone());
3736
3737        // Test: List all active connections
3738        let connections = node.list_active_connections().await;
3739        assert_eq!(connections.len(), 2);
3740
3741        // Verify peer1 and peer2 are in the list
3742        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3743        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3744
3745        assert!(peer1_conn.is_some());
3746        assert!(peer2_conn.is_some());
3747
3748        // Verify addresses match
3749        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3750        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3751
3752        Ok(())
3753    }
3754
3755    #[tokio::test]
3756    async fn test_remove_peer_success() -> Result<()> {
3757        let config = create_test_node_config();
3758        let node = P2PNode::new(config).await?;
3759
3760        // Add a peer
3761        let peer_id = "peer_to_remove".to_string();
3762        let peer_info = PeerInfo {
3763            peer_id: peer_id.clone(),
3764            addresses: vec!["192.168.1.100:9000".to_string()],
3765            connected_at: Instant::now(),
3766            last_seen: Instant::now(),
3767            status: ConnectionStatus::Connected,
3768            protocols: vec!["test-protocol".to_string()],
3769            heartbeat_count: 0,
3770        };
3771
3772        node.peers.write().await.insert(peer_id.clone(), peer_info);
3773
3774        // Verify peer exists
3775        assert!(node.is_peer_connected(&peer_id).await);
3776
3777        // Remove the peer
3778        let removed = node.remove_peer(&peer_id).await;
3779        assert!(removed);
3780
3781        // Verify peer no longer exists
3782        assert!(!node.is_peer_connected(&peer_id).await);
3783
3784        Ok(())
3785    }
3786
3787    #[tokio::test]
3788    async fn test_remove_peer_nonexistent() -> Result<()> {
3789        let config = create_test_node_config();
3790        let node = P2PNode::new(config).await?;
3791
3792        // Try to remove a peer that doesn't exist
3793        let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3794        assert!(!removed);
3795
3796        Ok(())
3797    }
3798
3799    #[tokio::test]
3800    async fn test_is_peer_connected() -> Result<()> {
3801        let config = create_test_node_config();
3802        let node = P2PNode::new(config).await?;
3803
3804        let peer_id = "test_peer".to_string();
3805
3806        // Initially not connected
3807        assert!(!node.is_peer_connected(&peer_id).await);
3808
3809        // Add peer
3810        let peer_info = PeerInfo {
3811            peer_id: peer_id.clone(),
3812            addresses: vec!["192.168.1.100:9000".to_string()],
3813            connected_at: Instant::now(),
3814            last_seen: Instant::now(),
3815            status: ConnectionStatus::Connected,
3816            protocols: vec!["test-protocol".to_string()],
3817            heartbeat_count: 0,
3818        };
3819
3820        node.peers.write().await.insert(peer_id.clone(), peer_info);
3821
3822        // Now connected
3823        assert!(node.is_peer_connected(&peer_id).await);
3824
3825        // Remove peer
3826        node.remove_peer(&peer_id).await;
3827
3828        // No longer connected
3829        assert!(!node.is_peer_connected(&peer_id).await);
3830
3831        Ok(())
3832    }
3833
3834    #[test]
3835    fn test_normalize_ipv6_wildcard() {
3836        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3837
3838        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3839        let normalized = normalize_wildcard_to_loopback(wildcard);
3840
3841        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3842        assert_eq!(normalized.port(), 8080);
3843    }
3844
3845    #[test]
3846    fn test_normalize_ipv4_wildcard() {
3847        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3848
3849        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3850        let normalized = normalize_wildcard_to_loopback(wildcard);
3851
3852        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3853        assert_eq!(normalized.port(), 9000);
3854    }
3855
3856    #[test]
3857    fn test_normalize_specific_address_unchanged() {
3858        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3859        let normalized = normalize_wildcard_to_loopback(specific);
3860
3861        assert_eq!(normalized, specific);
3862    }
3863
3864    #[test]
3865    fn test_normalize_loopback_unchanged() {
3866        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3867
3868        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3869        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3870        assert_eq!(normalized_v6, loopback_v6);
3871
3872        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3873        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3874        assert_eq!(normalized_v4, loopback_v4);
3875    }
3876
3877    // ---- parse_protocol_message regression tests ----
3878
3879    /// Helper to create a bincode-serialized WireMessage for tests
3880    fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
3881        let msg = WireMessage {
3882            protocol: protocol.to_string(),
3883            data,
3884            from: from.to_string(),
3885            timestamp,
3886        };
3887        postcard::to_stdvec(&msg).unwrap()
3888    }
3889
3890    #[test]
3891    fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
3892        // Regression: P2PEvent::Message.source must be the transport peer ID,
3893        // NOT the "from" field from the wire message.  This ensures consumers
3894        // can pass source directly to send_message().
3895        let transport_id = "abcdef0123456789";
3896        let logical_id = "spoofed-logical-id";
3897        let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, 1000);
3898
3899        let event =
3900            parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
3901
3902        match event {
3903            P2PEvent::Message {
3904                topic,
3905                source,
3906                data,
3907            } => {
3908                assert_eq!(source, transport_id, "source must be the transport peer ID");
3909                assert_ne!(
3910                    source, logical_id,
3911                    "source must NOT be the logical 'from' field"
3912                );
3913                assert_eq!(topic, "test/v1");
3914                assert_eq!(data, vec![1u8, 2, 3]);
3915            }
3916            other => panic!("expected P2PEvent::Message, got {:?}", other),
3917        }
3918    }
3919
3920    #[test]
3921    fn test_parse_protocol_message_rejects_invalid_bytes() {
3922        // Random bytes that are not valid bincode should be rejected
3923        assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
3924    }
3925
3926    #[test]
3927    fn test_parse_protocol_message_rejects_truncated_message() {
3928        // A truncated bincode message should fail to deserialize
3929        let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", 1000);
3930        let truncated = &full_bytes[..full_bytes.len() / 2];
3931        assert!(parse_protocol_message(truncated, "peer-id").is_none());
3932    }
3933
3934    #[test]
3935    fn test_parse_protocol_message_empty_payload() {
3936        let bytes = make_wire_bytes("ping", vec![], "sender", 1000);
3937
3938        let event = parse_protocol_message(&bytes, "transport-peer")
3939            .expect("valid message with empty data should parse");
3940
3941        match event {
3942            P2PEvent::Message { data, .. } => assert!(data.is_empty()),
3943            other => panic!("expected P2PEvent::Message, got {:?}", other),
3944        }
3945    }
3946
3947    #[test]
3948    fn test_parse_protocol_message_preserves_binary_payload() {
3949        // Verify that arbitrary byte values (including 0xFF, 0x00) survive round-trip
3950        let payload: Vec<u8> = (0..=255).collect();
3951        let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", 42);
3952
3953        let event = parse_protocol_message(&bytes, "peer-id")
3954            .expect("valid message with full byte range should parse");
3955
3956        match event {
3957            P2PEvent::Message { data, topic, .. } => {
3958                assert_eq!(topic, "binary/v1");
3959                assert_eq!(
3960                    data, payload,
3961                    "payload must survive bincode round-trip exactly"
3962                );
3963            }
3964            other => panic!("expected P2PEvent::Message, got {:?}", other),
3965        }
3966    }
3967}