Skip to main content

saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19#[cfg(feature = "adaptive-ml")]
20use crate::adaptive::{EigenTrustEngine, NodeId as AdaptiveNodeId, NodeStatisticsUpdate};
21use crate::bgp_geo_provider::BgpGeoProvider;
22use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
23use crate::config::Config;
24use crate::dht::DHT;
25use crate::error::{NetworkError, P2PError, P2pResult as Result};
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::{DualStackNetworkNode, ant_peer_id_to_string};
30use crate::validation::RateLimitConfig;
31use crate::validation::RateLimiter;
32use crate::{NetworkAddress, PeerId};
33use serde::{Deserialize, Serialize};
34use std::collections::{HashMap, HashSet};
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::time::Duration;
38use tokio::sync::{RwLock, broadcast};
39use tokio::time::{Instant, interval};
40use tracing::{debug, info, trace, warn};
41
42/// Wire protocol message format for P2P communication.
43///
44/// Serialized with bincode for compact binary encoding.
45/// Replaces the previous JSON format for better performance
46/// and smaller wire size.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48struct WireMessage {
49    /// Protocol/topic identifier
50    protocol: String,
51    /// Raw payload bytes
52    data: Vec<u8>,
53    /// Sender's peer ID (verified against transport-level identity)
54    from: String,
55    /// Unix timestamp in seconds
56    timestamp: u64,
57}
58
59/// Payload bytes used for keepalive messages to prevent connection timeouts.
60const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive";
61
62/// Capacity of the internal channel used by the message receiving system.
63const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
64
65/// Configuration for a P2P node
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct NodeConfig {
68    /// Local peer ID for this node
69    pub peer_id: Option<PeerId>,
70
71    /// Addresses to listen on for incoming connections
72    pub listen_addrs: Vec<std::net::SocketAddr>,
73
74    /// Primary listen address (for compatibility)
75    pub listen_addr: std::net::SocketAddr,
76
77    /// Bootstrap peers to connect to on startup (legacy)
78    pub bootstrap_peers: Vec<std::net::SocketAddr>,
79
80    /// Bootstrap peers as strings (for integration tests)
81    pub bootstrap_peers_str: Vec<String>,
82
83    /// Enable IPv6 support
84    pub enable_ipv6: bool,
85
86    // MCP removed; will be redesigned later
87    /// Connection timeout duration
88    pub connection_timeout: Duration,
89
90    /// Keep-alive interval for connections
91    pub keep_alive_interval: Duration,
92
93    /// Maximum number of concurrent connections
94    pub max_connections: usize,
95
96    /// Maximum number of incoming connections
97    pub max_incoming_connections: usize,
98
99    /// DHT configuration
100    pub dht_config: DHTConfig,
101
102    /// Security configuration
103    pub security_config: SecurityConfig,
104
105    /// Production hardening configuration
106    pub production_config: Option<ProductionConfig>,
107
108    /// Bootstrap cache configuration
109    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
110
111    /// Optional IP diversity configuration for Sybil protection tuning.
112    ///
113    /// When set, this configuration is used by bootstrap peer discovery and
114    /// other diversity-enforcing subsystems. If `None`, defaults are used.
115    pub diversity_config: Option<crate::security::IPDiversityConfig>,
116
117    /// Stale peer threshold - peers with no activity for this duration are considered stale.
118    /// Defaults to 60 seconds. Can be reduced for testing purposes.
119    #[serde(default = "default_stale_peer_threshold")]
120    pub stale_peer_threshold: Duration,
121}
122
123/// Default stale peer threshold (60 seconds)
124fn default_stale_peer_threshold() -> Duration {
125    Duration::from_secs(60)
126}
127
128/// DHT-specific configuration
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct DHTConfig {
131    /// Kademlia K parameter (bucket size)
132    pub k_value: usize,
133
134    /// Kademlia alpha parameter (parallelism)
135    pub alpha_value: usize,
136
137    /// DHT record TTL
138    pub record_ttl: Duration,
139
140    /// DHT refresh interval
141    pub refresh_interval: Duration,
142}
143
144/// Security configuration
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct SecurityConfig {
147    /// Enable noise protocol for encryption
148    pub enable_noise: bool,
149
150    /// Enable TLS for secure transport
151    pub enable_tls: bool,
152
153    /// Trust level for peer verification
154    pub trust_level: TrustLevel,
155}
156
157/// Trust level for peer verification
158#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
159pub enum TrustLevel {
160    /// No verification required
161    None,
162    /// Basic peer ID verification
163    Basic,
164    /// Full cryptographic verification
165    Full,
166}
167
168// ============================================================================
169// Address Construction Helpers
170// ============================================================================
171
172/// Build listen addresses based on port and IPv6 preference
173///
174/// This helper consolidates the duplicated address construction logic.
175#[inline]
176fn build_listen_addrs(port: u16, ipv6_enabled: bool) -> Vec<std::net::SocketAddr> {
177    let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
178
179    if ipv6_enabled {
180        addrs.push(std::net::SocketAddr::new(
181            std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
182            port,
183        ));
184    }
185
186    addrs.push(std::net::SocketAddr::new(
187        std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
188        port,
189    ));
190
191    addrs
192}
193
194impl NodeConfig {
195    /// Create a new NodeConfig with default values
196    ///
197    /// # Errors
198    ///
199    /// Returns an error if default addresses cannot be parsed
200    pub fn new() -> Result<Self> {
201        let config = Config::default();
202        let listen_addr = config.listen_socket_addr()?;
203
204        Ok(Self {
205            peer_id: None,
206            listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
207            listen_addr,
208            bootstrap_peers: Vec::new(),
209            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
210            enable_ipv6: config.network.ipv6_enabled,
211            connection_timeout: Duration::from_secs(config.network.connection_timeout),
212            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
213            max_connections: config.network.max_connections,
214            max_incoming_connections: config.security.connection_limit as usize,
215            dht_config: DHTConfig::default(),
216            security_config: SecurityConfig::default(),
217            production_config: None,
218            bootstrap_cache_config: None,
219            diversity_config: None,
220            stale_peer_threshold: default_stale_peer_threshold(),
221        })
222    }
223
224    /// Create a builder for customized NodeConfig construction
225    pub fn builder() -> NodeConfigBuilder {
226        NodeConfigBuilder::default()
227    }
228}
229
230// ============================================================================
231// NodeConfig Builder Pattern
232// ============================================================================
233
234/// Builder for constructing NodeConfig with fluent API
235#[derive(Debug, Clone, Default)]
236pub struct NodeConfigBuilder {
237    peer_id: Option<PeerId>,
238    listen_port: Option<u16>,
239    enable_ipv6: Option<bool>,
240    bootstrap_peers: Vec<std::net::SocketAddr>,
241    max_connections: Option<usize>,
242    connection_timeout: Option<Duration>,
243    keep_alive_interval: Option<Duration>,
244    dht_config: Option<DHTConfig>,
245    security_config: Option<SecurityConfig>,
246    production_config: Option<ProductionConfig>,
247}
248
249impl NodeConfigBuilder {
250    /// Set the peer ID
251    pub fn peer_id(mut self, peer_id: PeerId) -> Self {
252        self.peer_id = Some(peer_id);
253        self
254    }
255
256    /// Set the listen port
257    pub fn listen_port(mut self, port: u16) -> Self {
258        self.listen_port = Some(port);
259        self
260    }
261
262    /// Enable or disable IPv6
263    pub fn ipv6(mut self, enabled: bool) -> Self {
264        self.enable_ipv6 = Some(enabled);
265        self
266    }
267
268    /// Add a bootstrap peer
269    pub fn bootstrap_peer(mut self, addr: std::net::SocketAddr) -> Self {
270        self.bootstrap_peers.push(addr);
271        self
272    }
273
274    /// Set maximum connections
275    pub fn max_connections(mut self, max: usize) -> Self {
276        self.max_connections = Some(max);
277        self
278    }
279
280    /// Set connection timeout
281    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
282        self.connection_timeout = Some(timeout);
283        self
284    }
285
286    /// Set keep-alive interval
287    pub fn keep_alive_interval(mut self, interval: Duration) -> Self {
288        self.keep_alive_interval = Some(interval);
289        self
290    }
291
292    /// Set DHT configuration
293    pub fn dht_config(mut self, config: DHTConfig) -> Self {
294        self.dht_config = Some(config);
295        self
296    }
297
298    /// Set security configuration
299    pub fn security_config(mut self, config: SecurityConfig) -> Self {
300        self.security_config = Some(config);
301        self
302    }
303
304    /// Set production configuration
305    pub fn production_config(mut self, config: ProductionConfig) -> Self {
306        self.production_config = Some(config);
307        self
308    }
309
310    /// Build the NodeConfig
311    ///
312    /// # Errors
313    ///
314    /// Returns an error if address construction fails
315    pub fn build(self) -> Result<NodeConfig> {
316        let base_config = Config::default();
317        let default_port = base_config
318            .listen_socket_addr()
319            .map(|addr| addr.port())
320            .unwrap_or(9000);
321        let port = self.listen_port.unwrap_or(default_port);
322        let ipv6_enabled = self.enable_ipv6.unwrap_or(base_config.network.ipv6_enabled);
323
324        let listen_addr =
325            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), port);
326
327        Ok(NodeConfig {
328            peer_id: self.peer_id,
329            listen_addrs: build_listen_addrs(port, ipv6_enabled),
330            listen_addr,
331            bootstrap_peers: self.bootstrap_peers.clone(),
332            bootstrap_peers_str: self.bootstrap_peers.iter().map(|a| a.to_string()).collect(),
333            enable_ipv6: ipv6_enabled,
334            connection_timeout: self
335                .connection_timeout
336                .unwrap_or(Duration::from_secs(base_config.network.connection_timeout)),
337            keep_alive_interval: self
338                .keep_alive_interval
339                .unwrap_or(Duration::from_secs(base_config.network.keepalive_interval)),
340            max_connections: self
341                .max_connections
342                .unwrap_or(base_config.network.max_connections),
343            max_incoming_connections: base_config.security.connection_limit as usize,
344            dht_config: self.dht_config.unwrap_or_default(),
345            security_config: self.security_config.unwrap_or_default(),
346            production_config: self.production_config,
347            bootstrap_cache_config: None,
348            diversity_config: None,
349            stale_peer_threshold: default_stale_peer_threshold(),
350        })
351    }
352}
353
354impl Default for NodeConfig {
355    fn default() -> Self {
356        let config = Config::default();
357        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
358            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 9000)
359        });
360
361        Self {
362            peer_id: None,
363            listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
364            listen_addr,
365            bootstrap_peers: Vec::new(),
366            bootstrap_peers_str: Vec::new(),
367            enable_ipv6: config.network.ipv6_enabled,
368            connection_timeout: Duration::from_secs(config.network.connection_timeout),
369            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
370            max_connections: config.network.max_connections,
371            max_incoming_connections: config.security.connection_limit as usize,
372            dht_config: DHTConfig::default(),
373            security_config: SecurityConfig::default(),
374            production_config: None,
375            bootstrap_cache_config: None,
376            diversity_config: None,
377            stale_peer_threshold: default_stale_peer_threshold(),
378        }
379    }
380}
381
382impl NodeConfig {
383    /// Create NodeConfig from Config
384    pub fn from_config(config: &Config) -> Result<Self> {
385        let listen_addr = config.listen_socket_addr()?;
386        let bootstrap_addrs = config.bootstrap_addrs()?;
387
388        let mut node_config = Self {
389            peer_id: None,
390            listen_addrs: vec![listen_addr],
391            listen_addr,
392            bootstrap_peers: bootstrap_addrs
393                .iter()
394                .map(|addr| addr.socket_addr())
395                .collect(),
396            bootstrap_peers_str: config
397                .network
398                .bootstrap_nodes
399                .iter()
400                .map(|addr| addr.to_string())
401                .collect(),
402            enable_ipv6: config.network.ipv6_enabled,
403
404            connection_timeout: Duration::from_secs(config.network.connection_timeout),
405            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
406            max_connections: config.network.max_connections,
407            max_incoming_connections: config.security.connection_limit as usize,
408            dht_config: DHTConfig {
409                k_value: 20,
410                alpha_value: 3,
411                record_ttl: Duration::from_secs(3600),
412                refresh_interval: Duration::from_secs(900),
413            },
414            security_config: SecurityConfig {
415                enable_noise: true,
416                enable_tls: true,
417                trust_level: TrustLevel::Basic,
418            },
419            production_config: Some(ProductionConfig {
420                max_connections: config.network.max_connections,
421                max_memory_bytes: 0,  // unlimited
422                max_bandwidth_bps: 0, // unlimited
423                connection_timeout: Duration::from_secs(config.network.connection_timeout),
424                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
425                health_check_interval: Duration::from_secs(30),
426                metrics_interval: Duration::from_secs(60),
427                enable_performance_tracking: true,
428                enable_auto_cleanup: true,
429                shutdown_timeout: Duration::from_secs(30),
430                rate_limits: crate::production::RateLimitConfig::default(),
431            }),
432            bootstrap_cache_config: None,
433            diversity_config: None,
434            stale_peer_threshold: default_stale_peer_threshold(),
435        };
436
437        // Add IPv6 listen address if enabled
438        if config.network.ipv6_enabled {
439            node_config.listen_addrs.push(std::net::SocketAddr::new(
440                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
441                listen_addr.port(),
442            ));
443        }
444
445        Ok(node_config)
446    }
447
448    /// Try to build a NodeConfig from a listen address string
449    pub fn with_listen_addr(addr: &str) -> Result<Self> {
450        let listen_addr: std::net::SocketAddr = addr
451            .parse()
452            .map_err(|e: std::net::AddrParseError| {
453                NetworkError::InvalidAddress(e.to_string().into())
454            })
455            .map_err(P2PError::Network)?;
456        let cfg = NodeConfig {
457            listen_addr,
458            listen_addrs: vec![listen_addr],
459            diversity_config: None,
460            ..Default::default()
461        };
462        Ok(cfg)
463    }
464}
465
466impl Default for DHTConfig {
467    fn default() -> Self {
468        Self {
469            k_value: 20,
470            alpha_value: 5,
471            record_ttl: Duration::from_secs(3600), // 1 hour
472            refresh_interval: Duration::from_secs(600), // 10 minutes
473        }
474    }
475}
476
477impl Default for SecurityConfig {
478    fn default() -> Self {
479        Self {
480            enable_noise: true,
481            enable_tls: true,
482            trust_level: TrustLevel::Basic,
483        }
484    }
485}
486
487/// Information about a connected peer
488#[derive(Debug, Clone)]
489pub struct PeerInfo {
490    /// Peer identifier
491    pub peer_id: PeerId,
492
493    /// Peer's addresses
494    pub addresses: Vec<String>,
495
496    /// Connection timestamp
497    pub connected_at: Instant,
498
499    /// Last seen timestamp
500    pub last_seen: Instant,
501
502    /// Connection status
503    pub status: ConnectionStatus,
504
505    /// Supported protocols
506    pub protocols: Vec<String>,
507
508    /// Number of heartbeats received
509    pub heartbeat_count: u64,
510}
511
512/// Connection status for a peer
513#[derive(Debug, Clone, PartialEq)]
514pub enum ConnectionStatus {
515    /// Connection is being established
516    Connecting,
517    /// Connection is established and active
518    Connected,
519    /// Connection is being closed
520    Disconnecting,
521    /// Connection is closed
522    Disconnected,
523    /// Connection failed
524    Failed(String),
525}
526
527/// Network events that can occur
528#[derive(Debug, Clone)]
529pub enum NetworkEvent {
530    /// A new peer has connected
531    PeerConnected {
532        /// The identifier of the newly connected peer
533        peer_id: PeerId,
534        /// The network addresses where the peer can be reached
535        addresses: Vec<String>,
536    },
537
538    /// A peer has disconnected
539    PeerDisconnected {
540        /// The identifier of the disconnected peer
541        peer_id: PeerId,
542        /// The reason for the disconnection
543        reason: String,
544    },
545
546    /// A message was received from a peer
547    MessageReceived {
548        /// The identifier of the sending peer
549        peer_id: PeerId,
550        /// The protocol used for the message
551        protocol: String,
552        /// The raw message data
553        data: Vec<u8>,
554    },
555
556    /// A connection attempt failed
557    ConnectionFailed {
558        /// The identifier of the peer (if known)
559        peer_id: Option<PeerId>,
560        /// The address where connection was attempted
561        address: String,
562        /// The error message describing the failure
563        error: String,
564    },
565
566    /// DHT record was stored
567    DHTRecordStored {
568        /// The DHT key where the record was stored
569        key: Vec<u8>,
570        /// The value that was stored
571        value: Vec<u8>,
572    },
573
574    /// DHT record was retrieved
575    DHTRecordRetrieved {
576        /// The DHT key that was queried
577        key: Vec<u8>,
578        /// The retrieved value, if found
579        value: Option<Vec<u8>>,
580    },
581}
582
583/// Network events that can occur in the P2P system
584///
585/// Events are broadcast to all listeners and provide real-time
586/// notifications of network state changes and message arrivals.
587#[derive(Debug, Clone)]
588pub enum P2PEvent {
589    /// Message received from a peer on a specific topic
590    Message {
591        /// Topic or channel the message was sent on
592        topic: String,
593        /// Peer ID of the message sender
594        source: PeerId,
595        /// Raw message data payload
596        data: Vec<u8>,
597    },
598    /// A new peer has connected to the network
599    PeerConnected(PeerId),
600    /// A peer has disconnected from the network
601    PeerDisconnected(PeerId),
602}
603
604/// Main P2P node structure
605/// Main P2P network node that manages connections, routing, and communication
606///
607/// This struct represents a complete P2P network participant that can:
608/// - Connect to other peers via QUIC transport
609/// - Participate in distributed hash table (DHT) operations
610/// - Send and receive messages through various protocols
611/// - Handle network events and peer lifecycle
612/// - Provide MCP (Model Context Protocol) services
613pub struct P2PNode {
614    /// Node configuration
615    config: NodeConfig,
616
617    /// Our peer ID
618    peer_id: PeerId,
619
620    /// Connected peers
621    peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
622
623    /// Network event broadcaster
624    event_tx: broadcast::Sender<P2PEvent>,
625
626    /// Listen addresses
627    listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
628
629    /// Node start time
630    start_time: Instant,
631
632    /// Running state
633    running: RwLock<bool>,
634
635    /// DHT instance (optional)
636    dht: Option<Arc<RwLock<DHT>>>,
637
638    /// Production resource manager (optional)
639    resource_manager: Option<Arc<ResourceManager>>,
640
641    /// Bootstrap cache manager for peer discovery
642    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
643
644    /// Dual-stack ant-quic nodes (IPv6 + IPv4) with Happy Eyeballs dialing
645    dual_node: Arc<DualStackNetworkNode>,
646
647    /// Rate limiter for connection and request throttling
648    rate_limiter: Arc<RateLimiter>,
649
650    /// Active connections (tracked by peer_id)
651    /// This set is synchronized with ant-quic's connection state via event monitoring
652    active_connections: Arc<RwLock<HashSet<PeerId>>>,
653
654    /// Security dashboard for monitoring
655    pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
656
657    /// Connection lifecycle monitor task handle
658    #[allow(dead_code)]
659    connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
660
661    /// Keepalive task handle
662    #[allow(dead_code)]
663    keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
664
665    /// Periodic maintenance task handle
666    #[allow(dead_code)]
667    periodic_tasks_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
668
669    /// Shutdown flag for background tasks
670    shutdown: Arc<AtomicBool>,
671
672    /// Message receive task handles (recv tasks + consumer)
673    recv_handles: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
674
675    /// Accept loop task handle for graceful shutdown
676    listener_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
677
678    /// GeoIP provider for connection validation
679    #[allow(dead_code)]
680    geo_provider: Arc<BgpGeoProvider>,
681
682    /// Bootstrap state tracking - indicates whether peer discovery has completed
683    is_bootstrapped: Arc<AtomicBool>,
684
685    /// EigenTrust engine for reputation management
686    ///
687    /// Used to track peer reliability based on data availability outcomes.
688    /// Consumers (like saorsa-node) should report successes and failures
689    /// via `report_peer_success()` and `report_peer_failure()` methods.
690    #[cfg(feature = "adaptive-ml")]
691    trust_engine: Option<Arc<EigenTrustEngine>>,
692}
693
694/// Normalize wildcard bind addresses to localhost loopback addresses
695///
696/// ant-quic correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
697/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
698///
699/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
700/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
701/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
702/// - All other addresses pass through unchanged
703///
704/// # Arguments
705/// * `addr` - The SocketAddr to normalize
706///
707/// # Returns
708/// * Normalized SocketAddr suitable for remote connections
709fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
710    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
711
712    if addr.ip().is_unspecified() {
713        // Convert unspecified addresses to loopback
714        let loopback_ip = match addr {
715            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
716            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
717        };
718        std::net::SocketAddr::new(loopback_ip, addr.port())
719    } else {
720        // Not a wildcard address, pass through unchanged
721        addr
722    }
723}
724
725impl P2PNode {
726    /// Minimal constructor for tests that avoids real networking
727    pub fn new_for_tests() -> Result<Self> {
728        let (event_tx, _) = broadcast::channel(16);
729        Ok(Self {
730            config: NodeConfig::default(),
731            peer_id: "test_peer".to_string(),
732            peers: Arc::new(RwLock::new(HashMap::new())),
733            event_tx,
734            listen_addrs: RwLock::new(Vec::new()),
735            start_time: Instant::now(),
736            running: RwLock::new(false),
737            dht: None,
738            resource_manager: None,
739            bootstrap_manager: None,
740            dual_node: {
741                // Bind dual-stack nodes on ephemeral ports for tests
742                let v6: Option<std::net::SocketAddr> = "[::1]:0"
743                    .parse()
744                    .ok()
745                    .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
746                let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
747                let handle = tokio::runtime::Handle::current();
748                let dual_attempt = handle.block_on(
749                    crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
750                );
751                let dual = match dual_attempt {
752                    Ok(d) => d,
753                    Err(_e1) => {
754                        // Fallback to IPv4-only ephemeral bind
755                        let fallback = handle.block_on(
756                            crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
757                                None,
758                                "127.0.0.1:0".parse().ok(),
759                            ),
760                        );
761                        match fallback {
762                            Ok(d) => d,
763                            Err(e2) => {
764                                return Err(P2PError::Network(NetworkError::BindError(
765                                    format!("Failed to create dual-stack network node: {}", e2)
766                                        .into(),
767                                )));
768                            }
769                        }
770                    }
771                };
772                Arc::new(dual)
773            },
774            rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
775                max_requests: 100,
776                burst_size: 100,
777                window: std::time::Duration::from_secs(1),
778                ..Default::default()
779            })),
780            active_connections: Arc::new(RwLock::new(HashSet::new())),
781            connection_monitor_handle: Arc::new(RwLock::new(None)),
782            keepalive_handle: Arc::new(RwLock::new(None)),
783            periodic_tasks_handle: Arc::new(RwLock::new(None)),
784            shutdown: Arc::new(AtomicBool::new(false)),
785            recv_handles: Arc::new(RwLock::new(Vec::new())),
786            listener_handle: Arc::new(RwLock::new(None)),
787            geo_provider: Arc::new(BgpGeoProvider::new()),
788            security_dashboard: None,
789            is_bootstrapped: Arc::new(AtomicBool::new(false)),
790            #[cfg(feature = "adaptive-ml")]
791            trust_engine: None,
792        })
793    }
794    /// Create a new P2P node with the given configuration
795    pub async fn new(config: NodeConfig) -> Result<Self> {
796        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
797            // Generate a random peer ID for now
798            // Safe: UUID v4 canonical format is always 36 characters
799            let uuid_str = uuid::Uuid::new_v4().to_string();
800            format!("peer_{}", &uuid_str[..8])
801        });
802
803        let (event_tx, _) = broadcast::channel(1000);
804
805        // Initialize and register a TrustWeightedKademlia DHT for the global API
806        // Use a deterministic local NodeId derived from the peer_id
807        {
808            use blake3::Hasher;
809            let mut hasher = Hasher::new();
810            hasher.update(peer_id.as_bytes());
811            let digest = hasher.finalize();
812            let mut nid = [0u8; 32];
813            nid.copy_from_slice(digest.as_bytes());
814            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
815                crate::identity::node_identity::NodeId::from_bytes(nid),
816            ));
817            // TODO: Update to use new clean API
818            // let _ = crate::api::set_dht_instance(twdht);
819        }
820
821        // Initialize DHT if needed
822        let (dht, security_dashboard) = if true {
823            // Assuming DHT is always enabled for now, or check config
824            let _dht_config = crate::dht::DHTConfig {
825                replication_factor: config.dht_config.k_value,
826                bucket_size: config.dht_config.k_value,
827                alpha: config.dht_config.alpha_value,
828                record_ttl: config.dht_config.record_ttl,
829                bucket_refresh_interval: config.dht_config.refresh_interval,
830                republish_interval: config.dht_config.refresh_interval,
831                max_distance: 160,
832            };
833            // Convert peer_id String to NodeId
834            let peer_bytes = peer_id.as_bytes();
835            let mut node_id_bytes = [0u8; 32];
836            let len = peer_bytes.len().min(32);
837            node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
838            let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
839            let dht_instance = DHT::new(node_id).map_err(|e| {
840                crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
841                    e.to_string().into(),
842                ))
843            })?;
844            dht_instance.start_maintenance_tasks();
845
846            // Create Security Dashboard
847            let security_metrics = dht_instance.security_metrics();
848            let dashboard = crate::dht::metrics::SecurityDashboard::new(
849                security_metrics,
850                Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
851                Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
852                Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
853            );
854
855            (
856                Some(Arc::new(RwLock::new(dht_instance))),
857                Some(Arc::new(dashboard)),
858            )
859        } else {
860            (None, None)
861        };
862
863        // MCP removed
864
865        // Initialize production resource manager if configured
866        let resource_manager = config
867            .production_config
868            .clone()
869            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
870
871        // Initialize bootstrap cache manager
872        let diversity_config = config.diversity_config.clone().unwrap_or_default();
873        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
874            match BootstrapManager::with_full_config(
875                cache_config.clone(),
876                crate::rate_limit::JoinRateLimiterConfig::default(),
877                diversity_config.clone(),
878            )
879            .await
880            {
881                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
882                Err(e) => {
883                    warn!(
884                        "Failed to initialize bootstrap manager: {}, continuing without cache",
885                        e
886                    );
887                    None
888                }
889            }
890        } else {
891            match BootstrapManager::with_full_config(
892                crate::bootstrap::CacheConfig::default(),
893                crate::rate_limit::JoinRateLimiterConfig::default(),
894                diversity_config,
895            )
896            .await
897            {
898                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
899                Err(e) => {
900                    warn!(
901                        "Failed to initialize bootstrap manager: {}, continuing without cache",
902                        e
903                    );
904                    None
905                }
906            }
907        };
908
909        // Initialize EigenTrust engine for reputation management (only with adaptive-ml feature)
910        // Pre-trusted nodes are the bootstrap nodes (they start with high trust)
911        #[cfg(feature = "adaptive-ml")]
912        let trust_engine = {
913            use crate::adaptive::NodeId;
914            use std::collections::HashSet;
915
916            // Convert bootstrap peers to NodeIds for pre-trusted set
917            // TODO: Bootstrap peer addresses are hashed to create placeholder NodeIds here.
918            // The actual peer IDs differ from these hashes. This is a temporary solution -
919            // the pre-trusted set will be updated with real peer IDs when actual connections
920            // are established. A proper fix requires passing real peer IDs from the connection
921            // layer, which needs architectural changes.
922            let mut pre_trusted = HashSet::new();
923            for bootstrap_peer in &config.bootstrap_peers_str {
924                // Hash the bootstrap peer address to create a placeholder NodeId
925                let hash = blake3::hash(bootstrap_peer.as_bytes());
926                let mut node_id_bytes = [0u8; 32];
927                node_id_bytes.copy_from_slice(hash.as_bytes());
928                pre_trusted.insert(NodeId::from_bytes(node_id_bytes));
929            }
930
931            let engine = Arc::new(EigenTrustEngine::new(pre_trusted));
932            // Start background trust computation (every 5 minutes)
933            engine.clone().start_background_updates();
934            Some(engine)
935        };
936
937        // Initialize dual-stack ant-quic nodes
938        // Determine bind addresses
939        let (v6_opt, v4_opt) = {
940            let port = config.listen_addr.port();
941            let ip = config.listen_addr.ip();
942
943            let v4_addr = if ip.is_ipv4() {
944                Some(std::net::SocketAddr::new(ip, port))
945            } else {
946                // If config is IPv6, we still might want IPv4 on UNSPECIFIED if dual stack is desired
947                // But for now let's just stick to defaults if not specified
948                Some(std::net::SocketAddr::new(
949                    std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
950                    port,
951                ))
952            };
953
954            let v6_addr = if config.enable_ipv6 {
955                if ip.is_ipv6() {
956                    Some(std::net::SocketAddr::new(ip, port))
957                } else {
958                    Some(std::net::SocketAddr::new(
959                        std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
960                        port,
961                    ))
962                }
963            } else {
964                None
965            };
966            (v6_addr, v4_addr)
967        };
968
969        let dual_node = Arc::new(
970            DualStackNetworkNode::new_with_max_connections(v6_opt, v4_opt, config.max_connections)
971                .await
972                .map_err(|e| {
973                    P2PError::Transport(crate::error::TransportError::SetupFailed(
974                        format!("Failed to create dual-stack network nodes: {}", e).into(),
975                    ))
976                })?,
977        );
978
979        // Initialize rate limiter with default config
980        let rate_limiter = Arc::new(RateLimiter::new(
981            crate::validation::RateLimitConfig::default(),
982        ));
983
984        // Create active connections tracker
985        let active_connections = Arc::new(RwLock::new(HashSet::new()));
986
987        // Initialize GeoIP provider
988        let geo_provider = Arc::new(BgpGeoProvider::new());
989
990        // Create peers map
991        let peers = Arc::new(RwLock::new(HashMap::new()));
992
993        // Start connection lifecycle monitor
994        // CRITICAL: Subscribe to connection events BEFORE spawning the task
995        // to avoid race condition where early connections are missed
996        let connection_event_rx = dual_node.subscribe_connection_events();
997
998        let connection_monitor_handle = {
999            let active_conns = Arc::clone(&active_connections);
1000            let peers_map = Arc::clone(&peers);
1001            let event_tx_clone = event_tx.clone();
1002            let dual_node_clone = Arc::clone(&dual_node);
1003            let geo_provider_clone = Arc::clone(&geo_provider);
1004            let peer_id_clone = peer_id.clone();
1005
1006            let handle = tokio::spawn(async move {
1007                Self::connection_lifecycle_monitor_with_rx(
1008                    dual_node_clone,
1009                    connection_event_rx,
1010                    active_conns,
1011                    peers_map,
1012                    event_tx_clone,
1013                    geo_provider_clone,
1014                    peer_id_clone,
1015                )
1016                .await;
1017            });
1018
1019            Arc::new(RwLock::new(Some(handle)))
1020        };
1021
1022        // Spawn keepalive task
1023        let shutdown = Arc::new(AtomicBool::new(false));
1024        let keepalive_handle = {
1025            let active_conns = Arc::clone(&active_connections);
1026            let dual_node_clone = Arc::clone(&dual_node);
1027            let shutdown_clone = Arc::clone(&shutdown);
1028
1029            let handle = tokio::spawn(async move {
1030                Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
1031            });
1032
1033            Arc::new(RwLock::new(Some(handle)))
1034        };
1035
1036        // Spawn periodic maintenance task for stale peer detection
1037        let periodic_tasks_handle = {
1038            let peers_clone = Arc::clone(&peers);
1039            let active_conns_clone = Arc::clone(&active_connections);
1040            let event_tx_clone = event_tx.clone();
1041            let stale_threshold = config.stale_peer_threshold;
1042            let shutdown_clone = Arc::clone(&shutdown);
1043
1044            let handle = tokio::spawn(async move {
1045                Self::periodic_maintenance_task(
1046                    peers_clone,
1047                    active_conns_clone,
1048                    event_tx_clone,
1049                    stale_threshold,
1050                    shutdown_clone,
1051                )
1052                .await;
1053            });
1054
1055            Arc::new(RwLock::new(Some(handle)))
1056        };
1057
1058        let node = Self {
1059            config,
1060            peer_id,
1061            peers,
1062            event_tx,
1063            listen_addrs: RwLock::new(Vec::new()),
1064            start_time: Instant::now(),
1065            running: RwLock::new(false),
1066            dht,
1067            resource_manager,
1068            bootstrap_manager,
1069            dual_node,
1070            rate_limiter,
1071            active_connections,
1072            security_dashboard,
1073            connection_monitor_handle,
1074            keepalive_handle,
1075            periodic_tasks_handle,
1076            shutdown,
1077            recv_handles: Arc::new(RwLock::new(Vec::new())),
1078            listener_handle: Arc::new(RwLock::new(None)),
1079            geo_provider,
1080            is_bootstrapped: Arc::new(AtomicBool::new(false)),
1081            #[cfg(feature = "adaptive-ml")]
1082            trust_engine,
1083        };
1084        info!(
1085            "Created P2P node with peer ID: {} (call start() to begin networking)",
1086            node.peer_id
1087        );
1088
1089        Ok(node)
1090    }
1091
1092    /// Create a new node builder
1093    pub fn builder() -> NodeBuilder {
1094        NodeBuilder::new()
1095    }
1096
1097    /// Get the peer ID of this node
1098    pub fn peer_id(&self) -> &PeerId {
1099        &self.peer_id
1100    }
1101
1102    /// Get the hex-encoded transport-level peer ID.
1103    ///
1104    /// This is the ID used in `P2PEvent::Message.source`, `connected_peers()`,
1105    /// and `send_message()`. It differs from `peer_id()` which is the app-level ID.
1106    ///
1107    /// Returns the transport peer ID from whichever stack (v4 or v6) is active.
1108    pub fn transport_peer_id(&self) -> Option<String> {
1109        if let Some(ref v4) = self.dual_node.v4 {
1110            return Some(ant_peer_id_to_string(&v4.our_peer_id()));
1111        }
1112        if let Some(ref v6) = self.dual_node.v6 {
1113            return Some(ant_peer_id_to_string(&v6.our_peer_id()));
1114        }
1115        None
1116    }
1117
1118    pub fn local_addr(&self) -> Option<String> {
1119        self.listen_addrs
1120            .try_read()
1121            .ok()
1122            .and_then(|addrs| addrs.first().map(|a| a.to_string()))
1123    }
1124
1125    /// Check if the node has completed the initial bootstrap process
1126    ///
1127    /// Returns `true` if the node has successfully connected to at least one
1128    /// bootstrap peer and performed peer discovery (FIND_NODE).
1129    pub fn is_bootstrapped(&self) -> bool {
1130        self.is_bootstrapped.load(Ordering::SeqCst)
1131    }
1132
1133    /// Manually trigger re-bootstrap (useful for recovery or network rejoin)
1134    ///
1135    /// This clears the bootstrapped state and attempts to reconnect to
1136    /// bootstrap peers and discover new peers.
1137    pub async fn re_bootstrap(&self) -> Result<()> {
1138        self.is_bootstrapped.store(false, Ordering::SeqCst);
1139        self.connect_bootstrap_peers().await
1140    }
1141
1142    // =========================================================================
1143    // Trust API - EigenTrust Reputation System (requires adaptive-ml feature)
1144    // =========================================================================
1145
1146    /// Get the EigenTrust engine for direct trust operations
1147    ///
1148    /// This provides access to the underlying trust engine for advanced use cases.
1149    /// For simple success/failure reporting, prefer `report_peer_success()` and
1150    /// `report_peer_failure()`.
1151    ///
1152    /// Requires the `adaptive-ml` feature to be enabled.
1153    ///
1154    /// # Example
1155    ///
1156    /// ```rust,ignore
1157    /// if let Some(engine) = node.trust_engine() {
1158    ///     // Update node statistics directly
1159    ///     engine.update_node_stats(&peer_id, NodeStatisticsUpdate::StorageContributed(1024)).await;
1160    ///
1161    ///     // Get global trust scores
1162    ///     let scores = engine.compute_global_trust().await;
1163    /// }
1164    /// ```
1165    #[cfg(feature = "adaptive-ml")]
1166    pub fn trust_engine(&self) -> Option<Arc<EigenTrustEngine>> {
1167        self.trust_engine.clone()
1168    }
1169
1170    /// Canonical conversion from PeerId string to adaptive NodeId for trust.
1171    ///
1172    /// PeerId strings are hex-encoded 32-byte identifiers. This decodes them
1173    /// back to raw bytes, matching the DHT NodeId representation used by
1174    /// `trust_peer_selector`. Falls back to blake3 hash for non-hex IDs.
1175    #[cfg(feature = "adaptive-ml")]
1176    fn peer_id_to_trust_node_id(peer_id: &str) -> AdaptiveNodeId {
1177        if let Ok(bytes) = hex::decode(peer_id)
1178            && bytes.len() == 32
1179        {
1180            let mut arr = [0u8; 32];
1181            arr.copy_from_slice(&bytes);
1182            return AdaptiveNodeId::from_bytes(arr);
1183        }
1184        // Non-hex or wrong length: hash to 32 bytes as fallback
1185        let hash = blake3::hash(peer_id.as_bytes());
1186        AdaptiveNodeId::from_bytes(*hash.as_bytes())
1187    }
1188
1189    /// Report a successful interaction with a peer
1190    ///
1191    /// Call this after successful data operations to increase the peer's trust score.
1192    /// This is the primary method for saorsa-node to report positive outcomes.
1193    ///
1194    /// Requires the `adaptive-ml` feature to be enabled.
1195    ///
1196    /// # Arguments
1197    ///
1198    /// * `peer_id` - The peer ID (as a string) of the node that performed well
1199    ///
1200    /// # Example
1201    ///
1202    /// ```rust,ignore
1203    /// // After successfully retrieving a chunk from a peer
1204    /// if let Ok(chunk) = fetch_chunk_from(&peer_id).await {
1205    ///     node.report_peer_success(&peer_id).await?;
1206    /// }
1207    /// ```
1208    #[cfg(feature = "adaptive-ml")]
1209    pub async fn report_peer_success(&self, peer_id: &str) -> Result<()> {
1210        if let Some(ref engine) = self.trust_engine {
1211            let node_id = Self::peer_id_to_trust_node_id(peer_id);
1212
1213            engine
1214                .update_node_stats(&node_id, NodeStatisticsUpdate::CorrectResponse)
1215                .await;
1216            Ok(())
1217        } else {
1218            // Trust engine not initialized - this is not an error, just a no-op
1219            Ok(())
1220        }
1221    }
1222
1223    /// Report a failed interaction with a peer
1224    ///
1225    /// Call this after failed data operations to decrease the peer's trust score.
1226    /// This includes timeouts, corrupted data, or refused connections.
1227    ///
1228    /// Requires the `adaptive-ml` feature to be enabled.
1229    ///
1230    /// # Arguments
1231    ///
1232    /// * `peer_id` - The peer ID (as a string) of the node that failed
1233    ///
1234    /// # Example
1235    ///
1236    /// ```rust,ignore
1237    /// // After a chunk retrieval fails
1238    /// match fetch_chunk_from(&peer_id).await {
1239    ///     Ok(chunk) => node.report_peer_success(&peer_id).await?,
1240    ///     Err(_) => node.report_peer_failure(&peer_id).await?,
1241    /// }
1242    /// ```
1243    #[cfg(feature = "adaptive-ml")]
1244    pub async fn report_peer_failure(&self, peer_id: &str) -> Result<()> {
1245        if let Some(ref engine) = self.trust_engine {
1246            let node_id = Self::peer_id_to_trust_node_id(peer_id);
1247
1248            engine
1249                .update_node_stats(&node_id, NodeStatisticsUpdate::FailedResponse)
1250                .await;
1251            Ok(())
1252        } else {
1253            // Trust engine not initialized - this is not an error, just a no-op
1254            Ok(())
1255        }
1256    }
1257
1258    /// Get the current trust score for a peer
1259    ///
1260    /// Returns a value between 0.0 (untrusted) and 1.0 (fully trusted).
1261    /// Unknown peers return 0.0 by default.
1262    ///
1263    /// Requires the `adaptive-ml` feature to be enabled.
1264    ///
1265    /// # Arguments
1266    ///
1267    /// * `peer_id` - The peer ID (as a string) to query
1268    ///
1269    /// # Example
1270    ///
1271    /// ```rust,ignore
1272    /// let trust = node.peer_trust(&peer_id);
1273    /// if trust < 0.3 {
1274    ///     tracing::warn!("Low trust peer: {}", peer_id);
1275    /// }
1276    /// ```
1277    #[cfg(feature = "adaptive-ml")]
1278    pub fn peer_trust(&self, peer_id: &str) -> f64 {
1279        if let Some(ref engine) = self.trust_engine {
1280            let node_id = Self::peer_id_to_trust_node_id(peer_id);
1281
1282            use crate::adaptive::TrustProvider;
1283            engine.get_trust(&node_id)
1284        } else {
1285            // Trust engine not initialized - return neutral trust
1286            0.5
1287        }
1288    }
1289
1290    pub async fn subscribe(&self, topic: &str) -> Result<()> {
1291        // In a real implementation, this would register the topic with the pubsub mechanism.
1292        // For now, we just log it.
1293        info!("Subscribed to topic: {}", topic);
1294        Ok(())
1295    }
1296
1297    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1298        info!(
1299            "Publishing message to topic: {} ({} bytes)",
1300            topic,
1301            data.len()
1302        );
1303
1304        // Get list of connected peers
1305        let peer_list: Vec<PeerId> = {
1306            let peers_guard = self.peers.read().await;
1307            peers_guard.keys().cloned().collect()
1308        };
1309
1310        if peer_list.is_empty() {
1311            debug!("No peers connected, message will only be sent to local subscribers");
1312        } else {
1313            // Send message to all connected peers
1314            let mut send_count = 0;
1315            for peer_id in &peer_list {
1316                match self.send_message(peer_id, topic, data.to_vec()).await {
1317                    Ok(_) => {
1318                        send_count += 1;
1319                        debug!("Sent message to peer: {}", peer_id);
1320                    }
1321                    Err(e) => {
1322                        warn!("Failed to send message to peer {}: {}", peer_id, e);
1323                    }
1324                }
1325            }
1326            info!(
1327                "Published message to {}/{} connected peers",
1328                send_count,
1329                peer_list.len()
1330            );
1331        }
1332
1333        // Also send to local subscribers (for local echo and testing)
1334        self.send_event(P2PEvent::Message {
1335            topic: topic.to_string(),
1336            source: self.peer_id.clone(),
1337            data: data.to_vec(),
1338        });
1339
1340        Ok(())
1341    }
1342
1343    /// Get the node configuration
1344    pub fn config(&self) -> &NodeConfig {
1345        &self.config
1346    }
1347
1348    /// Start the P2P node
1349    pub async fn start(&self) -> Result<()> {
1350        info!("Starting P2P node...");
1351
1352        // Start production resource manager if configured
1353        if let Some(ref resource_manager) = self.resource_manager {
1354            resource_manager.start().await.map_err(|e| {
1355                P2PError::Network(crate::error::NetworkError::ProtocolError(
1356                    format!("Failed to start resource manager: {e}").into(),
1357                ))
1358            })?;
1359            info!("Production resource manager started");
1360        }
1361
1362        // Start bootstrap manager background tasks
1363        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1364            let mut manager = bootstrap_manager.write().await;
1365            manager.start_background_tasks().await.map_err(|e| {
1366                P2PError::Network(crate::error::NetworkError::ProtocolError(
1367                    format!("Failed to start bootstrap manager: {e}").into(),
1368                ))
1369            })?;
1370            info!("Bootstrap cache manager started");
1371        }
1372
1373        // Set running state
1374        *self.running.write().await = true;
1375
1376        // Start listening on configured addresses using transport layer
1377        self.start_network_listeners().await?;
1378
1379        // Update the connection monitor with actual peers reference
1380        self.start_connection_monitor().await;
1381
1382        // Log current listen addresses
1383        let listen_addrs = self.listen_addrs.read().await;
1384        info!("P2P node started on addresses: {:?}", *listen_addrs);
1385
1386        // NOTE: Message receiving is now integrated into the accept loop in start_network_listeners()
1387        // The old start_message_receiving_system() is no longer needed as it competed with the accept
1388        // loop for incoming connections, causing messages to be lost.
1389
1390        // Connect to bootstrap peers
1391        self.connect_bootstrap_peers().await?;
1392
1393        Ok(())
1394    }
1395
1396    /// Start network listeners on configured addresses
1397    async fn start_network_listeners(&self) -> Result<()> {
1398        info!("Starting dual-stack listeners (ant-quic)...");
1399        // Update our listen_addrs from the dual node bindings
1400        let addrs = self.dual_node.local_addrs().await.map_err(|e| {
1401            P2PError::Transport(crate::error::TransportError::SetupFailed(
1402                format!("Failed to get local addresses: {}", e).into(),
1403            ))
1404        })?;
1405        {
1406            let mut la = self.listen_addrs.write().await;
1407            *la = addrs.clone();
1408        }
1409
1410        // Spawn a background accept loop that handles incoming connections from either stack.
1411        // This only handles peer registration. Message data is received separately
1412        // via start_message_receiving_system() which uses endpoint().recv().
1413        let event_tx = self.event_tx.clone();
1414        let peers = self.peers.clone();
1415        let active_connections = self.active_connections.clone();
1416        let rate_limiter = self.rate_limiter.clone();
1417        let dual = self.dual_node.clone();
1418        let shutdown = Arc::clone(&self.shutdown);
1419        let handle = tokio::spawn(async move {
1420            loop {
1421                if shutdown.load(Ordering::Relaxed) {
1422                    break;
1423                }
1424                match dual.accept_any().await {
1425                    Ok((ant_peer_id, remote_sock)) => {
1426                        // Enforce rate limiting
1427                        if let Err(e) = rate_limiter.check_ip(&remote_sock.ip()) {
1428                            warn!(
1429                                "Rate-limited incoming connection from {}: {}",
1430                                remote_sock, e
1431                            );
1432                            continue;
1433                        }
1434
1435                        let peer_id =
1436                            crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
1437                        let remote_addr = NetworkAddress::from(remote_sock);
1438                        broadcast_event(&event_tx, P2PEvent::PeerConnected(peer_id.clone()));
1439                        register_new_peer(&peers, &peer_id, &remote_addr).await;
1440                        active_connections.write().await.insert(peer_id);
1441                    }
1442                    Err(e) => {
1443                        warn!("Accept failed: {}", e);
1444                        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1445                    }
1446                }
1447            }
1448        });
1449        *self.listener_handle.write().await = Some(handle);
1450
1451        // Start the recv-based message receiving system.
1452        // This uses endpoint().recv() which matches endpoint().send() used by send_message().
1453        // The accept loop above handles incoming connections (peer registration) and DHT
1454        // typed streams, while this handles general P2P messaging via the raw endpoint API.
1455        self.start_message_receiving_system().await?;
1456
1457        info!("Dual-stack listeners active on: {:?}", addrs);
1458        Ok(())
1459    }
1460
1461    /// Spawns per-stack recv tasks that feed into a single mpsc channel,
1462    /// then a dispatcher task that parses incoming bytes and emits P2P events.
1463    async fn start_message_receiving_system(&self) -> Result<()> {
1464        info!("Starting message receiving system");
1465
1466        let (tx, mut rx) = tokio::sync::mpsc::channel(MESSAGE_RECV_CHANNEL_CAPACITY);
1467        let shutdown = Arc::clone(&self.shutdown);
1468
1469        let mut handles = Vec::new();
1470
1471        // Spawn per-stack recv tasks — both feed into the same channel
1472        if let Some(v6) = self.dual_node.v6.as_ref() {
1473            handles.push(v6.spawn_recv_task(tx.clone(), Arc::clone(&shutdown)));
1474        }
1475        if let Some(v4) = self.dual_node.v4.as_ref() {
1476            handles.push(v4.spawn_recv_task(tx.clone(), Arc::clone(&shutdown)));
1477        }
1478        drop(tx); // drop original sender so rx closes when all tasks exit
1479
1480        let event_tx = self.event_tx.clone();
1481        handles.push(tokio::spawn(async move {
1482            info!("Message receive loop started");
1483            while let Some((peer_id, bytes)) = rx.recv().await {
1484                let transport_peer_id =
1485                    crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1486                trace!(
1487                    "Received {} bytes from peer {}",
1488                    bytes.len(),
1489                    transport_peer_id
1490                );
1491
1492                if bytes == KEEPALIVE_PAYLOAD {
1493                    trace!("Received keepalive from {}", transport_peer_id);
1494                    continue;
1495                }
1496
1497                match parse_protocol_message(&bytes, &transport_peer_id) {
1498                    Some(event) => broadcast_event(&event_tx, event),
1499                    None => {
1500                        warn!("Failed to parse protocol message ({} bytes)", bytes.len());
1501                    }
1502                }
1503            }
1504            info!("Message receive loop ended — channel closed");
1505        }));
1506
1507        *self.recv_handles.write().await = handles;
1508
1509        Ok(())
1510    }
1511
1512    /// Run the P2P node (blocks until shutdown)
1513    pub async fn run(&self) -> Result<()> {
1514        if !*self.running.read().await {
1515            self.start().await?;
1516        }
1517
1518        info!("P2P node running...");
1519
1520        // Main event loop
1521        loop {
1522            if !*self.running.read().await {
1523                break;
1524            }
1525
1526            // Perform periodic tasks
1527            self.periodic_tasks().await?;
1528
1529            // Sleep for a short interval
1530            tokio::time::sleep(Duration::from_millis(100)).await;
1531        }
1532
1533        info!("P2P node stopped");
1534        Ok(())
1535    }
1536
1537    /// Stop the P2P node
1538    pub async fn stop(&self) -> Result<()> {
1539        info!("Stopping P2P node...");
1540
1541        // Signal all background tasks to stop
1542        self.shutdown.store(true, Ordering::Relaxed);
1543
1544        // Set running state to false
1545        *self.running.write().await = false;
1546
1547        // Shut down the transport endpoints first. This cancels the
1548        // endpoint CancellationToken which unblocks any recv() calls
1549        // and aborts per-connection reader tasks inside ant-quic.
1550        // Note: ant-quic >=0.20 races recv() against the shutdown token
1551        // internally, so this is belt-and-suspenders -- but calling it
1552        // first is still the safest ordering.
1553        self.dual_node.shutdown_endpoints().await;
1554
1555        // Await recv system tasks
1556        let handles: Vec<_> = self.recv_handles.write().await.drain(..).collect();
1557        for handle in handles {
1558            match handle.await {
1559                Ok(()) => {}
1560                Err(e) if e.is_cancelled() => {
1561                    tracing::debug!("Recv task was cancelled during shutdown");
1562                }
1563                Err(e) if e.is_panic() => {
1564                    tracing::error!("Recv task panicked during shutdown: {:?}", e);
1565                }
1566                Err(e) => {
1567                    tracing::warn!("Recv task join error during shutdown: {:?}", e);
1568                }
1569            }
1570        }
1571
1572        // Await accept loop task
1573        if let Some(handle) = self.listener_handle.write().await.take() {
1574            match handle.await {
1575                Ok(()) => {}
1576                Err(e) if e.is_cancelled() => {
1577                    tracing::debug!("Listener task was cancelled during shutdown");
1578                }
1579                Err(e) if e.is_panic() => {
1580                    tracing::error!("Listener task panicked during shutdown: {:?}", e);
1581                }
1582                Err(e) => {
1583                    tracing::warn!("Listener task join error during shutdown: {:?}", e);
1584                }
1585            }
1586        }
1587
1588        // Disconnect all peers
1589        self.disconnect_all_peers().await?;
1590
1591        // Shutdown production resource manager if configured
1592        if let Some(ref resource_manager) = self.resource_manager {
1593            resource_manager.shutdown().await.map_err(|e| {
1594                P2PError::Network(crate::error::NetworkError::ProtocolError(
1595                    format!("Failed to shutdown resource manager: {e}").into(),
1596                ))
1597            })?;
1598            info!("Production resource manager stopped");
1599        }
1600
1601        info!("P2P node stopped");
1602        Ok(())
1603    }
1604
1605    /// Graceful shutdown alias for tests
1606    pub async fn shutdown(&self) -> Result<()> {
1607        self.stop().await
1608    }
1609
1610    /// Check if the node is running
1611    pub async fn is_running(&self) -> bool {
1612        *self.running.read().await
1613    }
1614
1615    /// Get the current listen addresses
1616    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1617        self.listen_addrs.read().await.clone()
1618    }
1619
1620    /// Get connected peers
1621    pub async fn connected_peers(&self) -> Vec<PeerId> {
1622        // "Connected" is defined as currently active at the transport layer.
1623        // The peers map may contain historical peers with Disconnected/Failed status.
1624        self.active_connections
1625            .read()
1626            .await
1627            .iter()
1628            .cloned()
1629            .collect()
1630    }
1631
1632    /// Get peer count
1633    pub async fn peer_count(&self) -> usize {
1634        self.active_connections.read().await.len()
1635    }
1636
1637    /// Get peer info
1638    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1639        self.peers.read().await.get(peer_id).cloned()
1640    }
1641
1642    /// Get the peer ID for a given socket address, if connected
1643    ///
1644    /// This method searches through all connected peers to find one that has
1645    /// the specified address in its address list.
1646    ///
1647    /// # Arguments
1648    /// * `addr` - The socket address to search for (e.g., "192.168.1.100:9000")
1649    ///
1650    /// # Returns
1651    /// * `Some(PeerId)` - The peer ID if a matching connected peer is found
1652    /// * `None` - If no peer with this address is currently connected
1653    pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1654        // Parse the address to a SocketAddr for comparison
1655        let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1656
1657        let peers = self.peers.read().await;
1658
1659        // Search through all connected peers
1660        for (peer_id, peer_info) in peers.iter() {
1661            // Check if this peer has a matching address
1662            for peer_addr in &peer_info.addresses {
1663                if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1664                    && peer_socket == socket_addr
1665                {
1666                    return Some(peer_id.clone());
1667                }
1668            }
1669        }
1670
1671        None
1672    }
1673
1674    /// List all active connections with their peer IDs and addresses
1675    ///
1676    /// # Returns
1677    /// A vector of tuples containing `(PeerId, Vec<String>)` where the `Vec<String>`
1678    /// contains all known addresses for that peer.
1679    pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1680        let active = self.active_connections.read().await;
1681        let peers = self.peers.read().await;
1682
1683        active
1684            .iter()
1685            .map(|peer_id| {
1686                let addresses = peers
1687                    .get(peer_id)
1688                    .map(|info| info.addresses.clone())
1689                    .unwrap_or_default();
1690                (peer_id.clone(), addresses)
1691            })
1692            .collect()
1693    }
1694
1695    /// Remove a peer from the peers map
1696    ///
1697    /// This method removes a peer from the internal peers map. It should be used
1698    /// when a connection is no longer valid (e.g., after detecting that the underlying
1699    /// ant-quic connection has closed).
1700    ///
1701    /// # Arguments
1702    /// * `peer_id` - The ID of the peer to remove
1703    ///
1704    /// # Returns
1705    /// `true` if the peer was found and removed, `false` if the peer was not in the map
1706    pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1707        // Remove from active connections tracking
1708        self.active_connections.write().await.remove(peer_id);
1709        // Remove from peers map and return whether it existed
1710        self.peers.write().await.remove(peer_id).is_some()
1711    }
1712
1713    /// Check if a peer is connected
1714    ///
1715    /// This method checks if the peer ID exists in the peers map. Note that this
1716    /// only verifies the peer is registered - it does not guarantee the underlying
1717    /// ant-quic connection is still active. For connection validation, use `send_message`
1718    /// which will fail if the connection is closed.
1719    ///
1720    /// # Arguments
1721    /// * `peer_id` - The ID of the peer to check
1722    ///
1723    /// # Returns
1724    /// `true` if the peer exists in the peers map, `false` otherwise
1725    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1726        self.peers.read().await.contains_key(peer_id)
1727    }
1728
1729    /// Connect to a peer
1730    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1731        info!("Connecting to peer at: {}", address);
1732
1733        // Check production limits if resource manager is enabled
1734        let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1735            Some(resource_manager.acquire_connection().await?)
1736        } else {
1737            None
1738        };
1739
1740        // Parse the address to SocketAddr format
1741        let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1742            P2PError::Network(crate::error::NetworkError::InvalidAddress(
1743                format!("{}: {}", address, e).into(),
1744            ))
1745        })?;
1746
1747        // Normalize wildcard addresses to loopback for local connections
1748        // This converts [::]:port → ::1:port and 0.0.0.0:port → 127.0.0.1:port
1749        let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1750        if normalized_addr != socket_addr {
1751            info!(
1752                "Normalized wildcard address {} to loopback {}",
1753                socket_addr, normalized_addr
1754            );
1755        }
1756
1757        // Establish a real connection via dual-stack Happy Eyeballs, but cap the wait
1758        let addr_list = vec![normalized_addr];
1759        let peer_id = match tokio::time::timeout(
1760            self.config.connection_timeout,
1761            self.dual_node.connect_happy_eyeballs(&addr_list),
1762        )
1763        .await
1764        {
1765            Ok(Ok(peer)) => {
1766                let connected_peer_id = ant_peer_id_to_string(&peer);
1767                info!("Successfully connected to peer: {}", connected_peer_id);
1768
1769                // Prevent self-connections by checking if remote peer_id matches our own
1770                if connected_peer_id == self.peer_id {
1771                    warn!(
1772                        "Detected self-connection to own address {} (peer_id: {}), rejecting",
1773                        address, connected_peer_id
1774                    );
1775                    // Actively close the QUIC connection instead of leaking it
1776                    // until idle timeout
1777                    self.dual_node.disconnect_peer(&peer).await;
1778                    return Err(P2PError::Network(
1779                        crate::error::NetworkError::InvalidAddress(
1780                            format!("Cannot connect to self ({})", address).into(),
1781                        ),
1782                    ));
1783                }
1784
1785                connected_peer_id
1786            }
1787            Ok(Err(e)) => {
1788                warn!("Failed to connect to peer at {}: {}", address, e);
1789                return Err(P2PError::Transport(
1790                    crate::error::TransportError::ConnectionFailed {
1791                        addr: normalized_addr,
1792                        reason: e.to_string().into(),
1793                    },
1794                ));
1795            }
1796            Err(_) => {
1797                warn!(
1798                    "Timed out connecting to peer at {} after {:?}",
1799                    address, self.config.connection_timeout
1800                );
1801                return Err(P2PError::Timeout(self.config.connection_timeout));
1802            }
1803        };
1804
1805        // Create peer info with connection details
1806        let peer_info = PeerInfo {
1807            peer_id: peer_id.clone(),
1808            addresses: vec![address.to_string()],
1809            connected_at: Instant::now(),
1810            last_seen: Instant::now(),
1811            status: ConnectionStatus::Connected,
1812            protocols: vec!["p2p-foundation/1.0".to_string()],
1813            heartbeat_count: 0,
1814        };
1815
1816        // Store peer information
1817        self.peers.write().await.insert(peer_id.clone(), peer_info);
1818
1819        // Add to active connections tracking
1820        // This is critical for is_connection_active() to work correctly
1821        self.active_connections
1822            .write()
1823            .await
1824            .insert(peer_id.clone());
1825
1826        // Record bandwidth usage if resource manager is enabled
1827        if let Some(ref resource_manager) = self.resource_manager {
1828            resource_manager.record_bandwidth(0, 0); // Placeholder for handshake data
1829        }
1830
1831        // Emit connection event
1832        self.send_event(P2PEvent::PeerConnected(peer_id.clone()));
1833
1834        info!("Connected to peer: {}", peer_id);
1835        Ok(peer_id)
1836    }
1837
1838    /// Disconnect from a peer
1839    ///
1840    /// Actively closes the underlying QUIC connection via
1841    /// `P2pEndpoint::disconnect()` and removes the peer from all
1842    /// local tracking maps.
1843    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1844        info!("Disconnecting from peer: {}", peer_id);
1845
1846        // Close the actual QUIC connection so the remote side is notified
1847        // immediately rather than waiting for idle timeout.
1848        self.dual_node.disconnect_peer_string(peer_id).await.ok();
1849
1850        // Remove from active connections
1851        self.active_connections.write().await.remove(peer_id);
1852
1853        if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1854            peer_info.status = ConnectionStatus::Disconnected;
1855
1856            // Emit event
1857            let _ = self
1858                .event_tx
1859                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1860
1861            info!("Disconnected from peer: {}", peer_id);
1862        }
1863
1864        Ok(())
1865    }
1866
1867    /// Check if a connection to a peer is active
1868    pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1869        self.active_connections.read().await.contains(peer_id)
1870    }
1871
1872    /// Send a message to a peer
1873    pub async fn send_message(
1874        &self,
1875        peer_id: &PeerId,
1876        protocol: &str,
1877        data: Vec<u8>,
1878    ) -> Result<()> {
1879        debug!(
1880            "Sending message to peer {} on protocol {}",
1881            peer_id, protocol
1882        );
1883
1884        // Check rate limits if resource manager is enabled
1885        if let Some(ref resource_manager) = self.resource_manager
1886            && !resource_manager
1887                .check_rate_limit(peer_id, "message")
1888                .await?
1889        {
1890            return Err(P2PError::ResourceExhausted(
1891                format!("Rate limit exceeded for peer {}", peer_id).into(),
1892            ));
1893        }
1894
1895        // Check if peer exists in peers map
1896        if !self.peers.read().await.contains_key(peer_id) {
1897            return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1898                peer_id.to_string().into(),
1899            )));
1900        }
1901
1902        // Check if the ant-quic connection is actually active
1903        // This is the critical fix for the connection state synchronization issue
1904        if !self.is_connection_active(peer_id).await {
1905            // Clean up stale peer entry
1906            self.remove_peer(peer_id).await;
1907
1908            return Err(P2PError::Network(
1909                crate::error::NetworkError::ConnectionClosed {
1910                    peer_id: peer_id.to_string().into(),
1911                },
1912            ));
1913        }
1914
1915        // MCP removed: no special-case protocol validation
1916
1917        // Record bandwidth usage if resource manager is enabled
1918        if let Some(ref resource_manager) = self.resource_manager {
1919            resource_manager.record_bandwidth(data.len() as u64, 0);
1920        }
1921
1922        // Create protocol message wrapper
1923        let raw_data_len = data.len();
1924        let _message_data = self.create_protocol_message(protocol, data)?;
1925        info!(
1926            "Sending {} bytes to peer {} on protocol {} (raw data: {} bytes)",
1927            _message_data.len(),
1928            peer_id,
1929            protocol,
1930            raw_data_len
1931        );
1932
1933        // Send via ant-quic dual-node using the raw endpoint send method.
1934        // This uses endpoint().send() which corresponds with endpoint().recv()
1935        // in the message receiving system.
1936        let send_fut = self
1937            .dual_node
1938            .send_to_peer_string_optimized(peer_id, &_message_data);
1939        let result = tokio::time::timeout(self.config.connection_timeout, send_fut)
1940            .await
1941            .map_err(|_| {
1942                P2PError::Transport(crate::error::TransportError::StreamError(
1943                    "Timed out sending message".into(),
1944                ))
1945            })?
1946            .map_err(|e| {
1947                P2PError::Transport(crate::error::TransportError::StreamError(
1948                    e.to_string().into(),
1949                ))
1950            });
1951
1952        if result.is_ok() {
1953            info!(
1954                "Successfully sent {} bytes to peer {}",
1955                _message_data.len(),
1956                peer_id
1957            );
1958        } else {
1959            warn!("Failed to send message to peer {}", peer_id);
1960        }
1961
1962        result
1963    }
1964
1965    /// Create a protocol message wrapper
1966    fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1967        let timestamp = std::time::SystemTime::now()
1968            .duration_since(std::time::UNIX_EPOCH)
1969            .map_err(|e| {
1970                P2PError::Network(NetworkError::ProtocolError(
1971                    format!("System time error: {}", e).into(),
1972                ))
1973            })?
1974            .as_secs();
1975
1976        let message = WireMessage {
1977            protocol: protocol.to_string(),
1978            data,
1979            from: self.peer_id.clone(),
1980            timestamp,
1981        };
1982
1983        postcard::to_stdvec(&message).map_err(|e| {
1984            P2PError::Transport(crate::error::TransportError::StreamError(
1985                format!("Failed to serialize message: {e}").into(),
1986            ))
1987        })
1988    }
1989
1990    // Note: async listen_addrs() already exists above for fetching listen addresses
1991}
1992
1993/// Parse a postcard-encoded protocol message into a `P2PEvent::Message`.
1994///
1995/// Returns `None` if the bytes cannot be deserialized as a valid `WireMessage`.
1996///
1997/// The `from` field is a required part of the wire protocol but is **not**
1998/// used as the event source. Instead, `source` — the transport-level peer ID
1999/// derived from the authenticated QUIC connection — is used so that consumers
2000/// can pass it directly to `send_message()`. This eliminates a spoofing
2001/// vector where a peer could claim an arbitrary identity via the payload.
2002///
2003/// Maximum allowed clock skew for message timestamps (5 minutes).
2004/// This is intentionally lenient for initial deployment to accommodate nodes with
2005/// misconfigured clocks or high-latency network conditions. Can be tightened (e.g., to 60s)
2006/// once the network stabilizes and node clock synchronization improves.
2007const MAX_MESSAGE_AGE_SECS: u64 = 300;
2008/// Maximum allowed future timestamp (30 seconds to account for clock drift)
2009const MAX_FUTURE_SECS: u64 = 30;
2010
2011/// Helper to send an event via a broadcast sender, logging at trace level if no receivers.
2012fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
2013    if let Err(e) = tx.send(event) {
2014        tracing::trace!("Event broadcast has no receivers: {e}");
2015    }
2016}
2017
2018fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<P2PEvent> {
2019    let message: WireMessage = postcard::from_bytes(bytes).ok()?;
2020
2021    // Validate timestamp to prevent replay attacks
2022    let now = std::time::SystemTime::now()
2023        .duration_since(std::time::UNIX_EPOCH)
2024        .map(|d| d.as_secs())
2025        .unwrap_or(0);
2026
2027    // Reject messages that are too old (potential replay)
2028    if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
2029        tracing::warn!(
2030            "Rejecting stale message from {} (timestamp {} is {} seconds old)",
2031            source,
2032            message.timestamp,
2033            now.saturating_sub(message.timestamp)
2034        );
2035        return None;
2036    }
2037
2038    // Reject messages too far in the future (clock manipulation)
2039    if message.timestamp > now + MAX_FUTURE_SECS {
2040        tracing::warn!(
2041            "Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
2042            source,
2043            message.timestamp,
2044            message.timestamp.saturating_sub(now)
2045        );
2046        return None;
2047    }
2048
2049    debug!(
2050        "Parsed P2PEvent::Message - topic: {}, source: {} (logical: {}), payload_len: {}",
2051        message.protocol,
2052        source,
2053        message.from,
2054        message.data.len()
2055    );
2056
2057    Some(P2PEvent::Message {
2058        topic: message.protocol,
2059        source: source.to_string(),
2060        data: message.data,
2061    })
2062}
2063
2064impl P2PNode {
2065    /// Subscribe to network events
2066    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
2067        self.event_tx.subscribe()
2068    }
2069
2070    /// Backwards-compat event stream accessor for tests
2071    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
2072        self.subscribe_events()
2073    }
2074
2075    /// Send an event to all subscribers, logging at trace level if no receivers are present.
2076    fn send_event(&self, event: P2PEvent) {
2077        if let Err(e) = self.event_tx.send(event) {
2078            tracing::trace!("Event broadcast has no receivers: {e}");
2079        }
2080    }
2081
2082    /// Get node uptime
2083    pub fn uptime(&self) -> Duration {
2084        self.start_time.elapsed()
2085    }
2086
2087    // MCP removed: all MCP tool/service methods removed
2088
2089    // /// Handle MCP remote tool call with network integration
2090
2091    // /// List tools available on a specific remote peer
2092
2093    // /// Get MCP server statistics
2094
2095    /// Get production resource metrics
2096    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
2097        if let Some(ref resource_manager) = self.resource_manager {
2098            Ok(resource_manager.get_metrics().await)
2099        } else {
2100            Err(P2PError::Network(
2101                crate::error::NetworkError::ProtocolError(
2102                    "Production resource manager not enabled".to_string().into(),
2103                ),
2104            ))
2105        }
2106    }
2107
2108    /// Connection lifecycle monitor task - processes ant-quic connection events
2109    /// and updates active_connections HashSet and peers map.
2110    ///
2111    /// This version accepts a pre-subscribed receiver to avoid the race condition
2112    /// where early connections could be missed if subscription happens after the task starts.
2113    #[allow(clippy::too_many_arguments)]
2114    async fn connection_lifecycle_monitor_with_rx(
2115        dual_node: Arc<DualStackNetworkNode>,
2116        mut event_rx: broadcast::Receiver<crate::transport::ant_quic_adapter::ConnectionEvent>,
2117        active_connections: Arc<RwLock<HashSet<String>>>,
2118        peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
2119        event_tx: broadcast::Sender<P2PEvent>,
2120        geo_provider: Arc<BgpGeoProvider>,
2121        _local_peer_id: String,
2122    ) {
2123        use crate::transport::ant_quic_adapter::ConnectionEvent;
2124
2125        info!("Connection lifecycle monitor started (pre-subscribed receiver)");
2126
2127        loop {
2128            match event_rx.recv().await {
2129                Ok(event) => {
2130                    match event {
2131                        ConnectionEvent::Established {
2132                            peer_id,
2133                            remote_address,
2134                        } => {
2135                            let peer_id_str = ant_peer_id_to_string(&peer_id);
2136                            debug!(
2137                                "Connection established: peer={}, addr={}",
2138                                peer_id_str, remote_address
2139                            );
2140
2141                            // **GeoIP Validation**
2142                            let ip = remote_address.ip();
2143                            let is_rejected = match ip {
2144                                std::net::IpAddr::V4(v4) => {
2145                                    if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
2146                                        geo_provider.is_hosting_asn(asn)
2147                                            || geo_provider.is_vpn_asn(asn)
2148                                    } else {
2149                                        false
2150                                    }
2151                                }
2152                                std::net::IpAddr::V6(v6) => {
2153                                    let info = geo_provider.lookup(v6);
2154                                    info.is_hosting_provider || info.is_vpn_provider
2155                                }
2156                            };
2157
2158                            if is_rejected {
2159                                info!(
2160                                    "Rejecting connection from {} ({}) due to GeoIP policy",
2161                                    peer_id_str, remote_address
2162                                );
2163                                // Actively close the QUIC connection instead of
2164                                // leaking it until idle timeout
2165                                dual_node.disconnect_peer(&peer_id).await;
2166                                continue;
2167                            }
2168
2169                            // Add to active connections
2170                            active_connections.write().await.insert(peer_id_str.clone());
2171
2172                            // Update peer info or insert new
2173                            let mut peers_lock = peers.write().await;
2174                            if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
2175                                peer_info.status = ConnectionStatus::Connected;
2176                                peer_info.connected_at = Instant::now();
2177                            } else {
2178                                debug!("Registering new incoming peer: {}", peer_id_str);
2179                                peers_lock.insert(
2180                                    peer_id_str.clone(),
2181                                    PeerInfo {
2182                                        peer_id: peer_id_str.clone(),
2183                                        addresses: vec![remote_address.to_string()],
2184                                        status: ConnectionStatus::Connected,
2185                                        last_seen: Instant::now(),
2186                                        connected_at: Instant::now(),
2187                                        protocols: Vec::new(),
2188                                        heartbeat_count: 0,
2189                                    },
2190                                );
2191                            }
2192
2193                            // Broadcast connection event
2194                            broadcast_event(&event_tx, P2PEvent::PeerConnected(peer_id_str));
2195                        }
2196                        ConnectionEvent::Lost { peer_id, reason } => {
2197                            let peer_id_str = ant_peer_id_to_string(&peer_id);
2198                            debug!("Connection lost: peer={peer_id_str}, reason={reason}");
2199
2200                            // Remove from active connections
2201                            active_connections.write().await.remove(&peer_id_str);
2202
2203                            // Update peer info status
2204                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2205                                peer_info.status = ConnectionStatus::Disconnected;
2206                                peer_info.last_seen = Instant::now();
2207                            }
2208
2209                            // Broadcast disconnection event
2210                            broadcast_event(&event_tx, P2PEvent::PeerDisconnected(peer_id_str));
2211                        }
2212                        ConnectionEvent::Failed { peer_id, reason } => {
2213                            let peer_id_str = ant_peer_id_to_string(&peer_id);
2214                            debug!("Connection failed: peer={peer_id_str}, reason={reason}");
2215
2216                            // Remove from active connections
2217                            active_connections.write().await.remove(&peer_id_str);
2218
2219                            // Update peer info status
2220                            if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2221                                peer_info.status = ConnectionStatus::Disconnected;
2222                                peer_info.last_seen = Instant::now();
2223                            }
2224
2225                            // Broadcast disconnection event
2226                            broadcast_event(&event_tx, P2PEvent::PeerDisconnected(peer_id_str));
2227                        }
2228                    }
2229                }
2230                Err(broadcast::error::RecvError::Lagged(skipped)) => {
2231                    warn!(
2232                        "Connection event receiver lagged, skipped {} events",
2233                        skipped
2234                    );
2235                }
2236                Err(broadcast::error::RecvError::Closed) => {
2237                    info!("Connection event channel closed, stopping lifecycle monitor");
2238                    break;
2239                }
2240            }
2241        }
2242    }
2243
2244    /// Start connection monitor (called after node initialization)
2245    async fn start_connection_monitor(&self) {
2246        // The monitor task is already spawned in new() with a temporary peers map
2247        // This method is a placeholder for future enhancements where we might
2248        // need to restart the monitor or provide it with updated references
2249        debug!("Connection monitor already running from initialization");
2250    }
2251
2252    /// Keepalive task - sends periodic pings to prevent 30-second idle timeout
2253    ///
2254    /// ant-quic has a 30-second max_idle_timeout. This task sends a small keepalive
2255    /// message every 15 seconds (half the timeout) to all active connections to prevent
2256    /// them from timing out during periods of inactivity.
2257    async fn keepalive_task(
2258        active_connections: Arc<RwLock<HashSet<String>>>,
2259        dual_node: Arc<DualStackNetworkNode>,
2260        shutdown: Arc<AtomicBool>,
2261    ) {
2262        const KEEPALIVE_INTERVAL_SECS: u64 = 15; // Half of 30-second timeout
2263
2264        let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
2265        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2266
2267        info!(
2268            "Keepalive task started (interval: {}s)",
2269            KEEPALIVE_INTERVAL_SECS
2270        );
2271
2272        loop {
2273            // Check shutdown flag first
2274            if shutdown.load(Ordering::Relaxed) {
2275                info!("Keepalive task shutting down");
2276                break;
2277            }
2278
2279            interval.tick().await;
2280
2281            // Get snapshot of active connections
2282            let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
2283
2284            if peers.is_empty() {
2285                trace!("Keepalive: no active connections");
2286                continue;
2287            }
2288
2289            debug!("Sending keepalive to {} active connections", peers.len());
2290
2291            // Send keepalives concurrently so one slow/stuck peer doesn't
2292            // delay keepalives to all others and cause cascading timeouts.
2293            let futs: Vec<_> = peers
2294                .into_iter()
2295                .map(|peer_id| {
2296                    let node = Arc::clone(&dual_node);
2297                    async move {
2298                        if let Err(e) = node
2299                            .send_to_peer_string_optimized(&peer_id, KEEPALIVE_PAYLOAD)
2300                            .await
2301                        {
2302                            debug!(
2303                                "Failed to send keepalive to peer {}: {} (connection may have closed)",
2304                                peer_id, e
2305                            );
2306                        } else {
2307                            trace!("Keepalive sent to peer: {}", peer_id);
2308                        }
2309                    }
2310                })
2311                .collect();
2312            futures::future::join_all(futs).await;
2313        }
2314
2315        info!("Keepalive task stopped");
2316    }
2317
2318    /// Periodic maintenance task - detects stale peers and removes them
2319    ///
2320    /// This task runs every 100ms and:
2321    /// 1. Detects peers that haven't been seen for longer than stale_threshold
2322    /// 2. Marks them as disconnected and emits PeerDisconnected events
2323    /// 3. Removes fully disconnected peers after cleanup_threshold (2x stale_threshold)
2324    async fn periodic_maintenance_task(
2325        peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2326        active_connections: Arc<RwLock<HashSet<PeerId>>>,
2327        event_tx: broadcast::Sender<P2PEvent>,
2328        stale_threshold: Duration,
2329        shutdown: Arc<AtomicBool>,
2330    ) {
2331        let cleanup_threshold = stale_threshold * 2;
2332        let mut interval = interval(Duration::from_millis(100));
2333        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2334
2335        info!(
2336            "Periodic maintenance task started (stale threshold: {:?})",
2337            stale_threshold
2338        );
2339
2340        loop {
2341            interval.tick().await;
2342
2343            if shutdown.load(Ordering::SeqCst) {
2344                break;
2345            }
2346
2347            let now = Instant::now();
2348            let mut peers_to_remove: Vec<PeerId> = Vec::new();
2349            let mut peers_to_mark_disconnected: Vec<PeerId> = Vec::new();
2350
2351            // Phase 1: Identify stale and disconnected peers
2352            {
2353                let peers_lock = peers.read().await;
2354                for (peer_id, peer_info) in peers_lock.iter() {
2355                    let elapsed = now.duration_since(peer_info.last_seen);
2356
2357                    match &peer_info.status {
2358                        ConnectionStatus::Connected => {
2359                            if elapsed > stale_threshold {
2360                                debug!(
2361                                    peer_id = %peer_id,
2362                                    elapsed_secs = elapsed.as_secs(),
2363                                    "Peer went stale - marking for disconnection"
2364                                );
2365                                peers_to_mark_disconnected.push(peer_id.clone());
2366                            }
2367                        }
2368                        ConnectionStatus::Disconnected | ConnectionStatus::Failed(_) => {
2369                            if elapsed > cleanup_threshold {
2370                                trace!(
2371                                    peer_id = %peer_id,
2372                                    elapsed_secs = elapsed.as_secs(),
2373                                    "Removing disconnected peer from tracking"
2374                                );
2375                                peers_to_remove.push(peer_id.clone());
2376                            }
2377                        }
2378                        ConnectionStatus::Connecting | ConnectionStatus::Disconnecting => {
2379                            if elapsed > stale_threshold {
2380                                debug!(
2381                                    peer_id = %peer_id,
2382                                    status = ?peer_info.status,
2383                                    "Connection timed out in transitional state"
2384                                );
2385                                peers_to_mark_disconnected.push(peer_id.clone());
2386                            }
2387                        }
2388                    }
2389                }
2390            }
2391
2392            // Phase 2: Mark stale peers as disconnected
2393            // Note: We do NOT reset last_seen here. The cleanup timer uses the original
2394            // last_seen timestamp, so disconnected peers are removed after cleanup_threshold
2395            // from when they were last actually seen, not from when they were marked disconnected.
2396            if !peers_to_mark_disconnected.is_empty() {
2397                let mut peers_lock = peers.write().await;
2398                for peer_id in &peers_to_mark_disconnected {
2399                    if let Some(peer_info) = peers_lock.get_mut(peer_id) {
2400                        peer_info.status = ConnectionStatus::Disconnected;
2401                    }
2402                }
2403            }
2404
2405            // Phase 3: Remove from active_connections and emit events
2406            for peer_id in &peers_to_mark_disconnected {
2407                active_connections.write().await.remove(peer_id);
2408                broadcast_event(&event_tx, P2PEvent::PeerDisconnected(peer_id.clone()));
2409                info!(peer_id = %peer_id, "Stale peer disconnected");
2410            }
2411
2412            // Phase 4: Remove fully cleaned up peers from tracking
2413            if !peers_to_remove.is_empty() {
2414                let mut peers_lock = peers.write().await;
2415                for peer_id in &peers_to_remove {
2416                    peers_lock.remove(peer_id);
2417                    trace!(peer_id = %peer_id, "Peer removed from tracking");
2418                }
2419            }
2420        }
2421
2422        info!("Periodic maintenance task stopped");
2423    }
2424
2425    /// Check system health
2426    pub async fn health_check(&self) -> Result<()> {
2427        if let Some(ref resource_manager) = self.resource_manager {
2428            resource_manager.health_check().await
2429        } else {
2430            // Basic health check without resource manager
2431            let peer_count = self.peer_count().await;
2432            if peer_count > self.config.max_connections {
2433                Err(P2PError::Network(
2434                    crate::error::NetworkError::ProtocolError(
2435                        format!("Too many connections: {peer_count}").into(),
2436                    ),
2437                ))
2438            } else {
2439                Ok(())
2440            }
2441        }
2442    }
2443
2444    /// Get production configuration (if enabled)
2445    pub fn production_config(&self) -> Option<&ProductionConfig> {
2446        self.config.production_config.as_ref()
2447    }
2448
2449    /// Check if production hardening is enabled
2450    pub fn is_production_mode(&self) -> bool {
2451        self.resource_manager.is_some()
2452    }
2453
2454    /// Get DHT reference
2455    pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2456        self.dht.as_ref()
2457    }
2458
2459    /// Store a value in the local DHT
2460    ///
2461    /// This method stores data in the local DHT instance only. For network-wide
2462    /// replication across multiple nodes, use `DhtNetworkManager` instead,
2463    /// which owns a P2PNode for transport and coordinates replication.
2464    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2465        if let Some(ref dht) = self.dht {
2466            let mut dht_instance = dht.write().await;
2467            let dht_key = crate::dht::DhtKey::from_bytes(key);
2468            dht_instance.store(&dht_key, value).await.map_err(|e| {
2469                P2PError::Dht(crate::error::DhtError::StoreFailed(
2470                    format!("{:?}: {e}", key).into(),
2471                ))
2472            })?;
2473
2474            Ok(())
2475        } else {
2476            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2477                "DHT not enabled".to_string().into(),
2478            )))
2479        }
2480    }
2481
2482    /// Retrieve a value from the local DHT
2483    ///
2484    /// This method retrieves data from the local DHT instance only. For network-wide
2485    /// lookups across multiple nodes, use `DhtNetworkManager` instead,
2486    /// which owns a P2PNode for transport and coordinates distributed queries.
2487    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2488        if let Some(ref dht) = self.dht {
2489            let dht_instance = dht.read().await;
2490            let dht_key = crate::dht::DhtKey::from_bytes(key);
2491            let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2492                P2PError::Dht(crate::error::DhtError::StoreFailed(
2493                    format!("Retrieve failed: {e}").into(),
2494                ))
2495            })?;
2496
2497            Ok(record_result)
2498        } else {
2499            Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2500                "DHT not enabled".to_string().into(),
2501            )))
2502        }
2503    }
2504
2505    /// Add a discovered peer to the bootstrap cache
2506    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2507        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2508            let manager = bootstrap_manager.write().await;
2509            let socket_addresses: Vec<std::net::SocketAddr> = addresses
2510                .iter()
2511                .filter_map(|addr| addr.parse().ok())
2512                .collect();
2513            let contact = ContactEntry::new(peer_id, socket_addresses);
2514            manager.add_contact(contact).await.map_err(|e| {
2515                P2PError::Network(crate::error::NetworkError::ProtocolError(
2516                    format!("Failed to add peer to bootstrap cache: {e}").into(),
2517                ))
2518            })?;
2519        }
2520        Ok(())
2521    }
2522
2523    /// Update connection metrics for a peer in the bootstrap cache
2524    pub async fn update_peer_metrics(
2525        &self,
2526        peer_id: &PeerId,
2527        success: bool,
2528        latency_ms: Option<u64>,
2529        _error: Option<String>,
2530    ) -> Result<()> {
2531        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2532            let manager = bootstrap_manager.write().await;
2533
2534            // Create quality metrics based on the connection result
2535            let metrics = QualityMetrics {
2536                success_rate: if success { 1.0 } else { 0.0 },
2537                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2538                quality_score: if success { 0.8 } else { 0.2 }, // Initial score
2539                last_connection_attempt: chrono::Utc::now(),
2540                last_successful_connection: if success {
2541                    chrono::Utc::now()
2542                } else {
2543                    chrono::Utc::now() - chrono::Duration::hours(1)
2544                },
2545                uptime_score: 0.5,
2546            };
2547
2548            manager
2549                .update_contact_metrics(peer_id, metrics)
2550                .await
2551                .map_err(|e| {
2552                    P2PError::Network(crate::error::NetworkError::ProtocolError(
2553                        format!("Failed to update peer metrics: {e}").into(),
2554                    ))
2555                })?;
2556        }
2557        Ok(())
2558    }
2559
2560    /// Get bootstrap cache statistics
2561    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2562        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2563            let manager = bootstrap_manager.read().await;
2564            let stats = manager.get_stats().await.map_err(|e| {
2565                P2PError::Network(crate::error::NetworkError::ProtocolError(
2566                    format!("Failed to get bootstrap stats: {e}").into(),
2567                ))
2568            })?;
2569            Ok(Some(stats))
2570        } else {
2571            Ok(None)
2572        }
2573    }
2574
2575    /// Get the number of cached bootstrap peers
2576    pub async fn cached_peer_count(&self) -> usize {
2577        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2578            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2579        {
2580            return stats.total_contacts;
2581        }
2582        0
2583    }
2584
2585    /// Discover peers from a connected bootstrap peer using FIND_NODE
2586    ///
2587    /// Sends a FIND_NODE request for our own peer ID to discover nearby peers
2588    /// and populate the routing table. This is the core of Kademlia peer discovery.
2589    async fn discover_peers_from(&self, peer_id: &PeerId) -> Result<usize> {
2590        use crate::dht::network_integration::DhtMessage;
2591
2592        info!("Discovering peers from bootstrap peer: {}", peer_id);
2593
2594        // Create our node ID as a DhtKey for the FIND_NODE query
2595        // We query for ourselves to find nodes closest to us
2596        let our_id_bytes = {
2597            use blake3::Hasher;
2598            let mut hasher = Hasher::new();
2599            hasher.update(self.peer_id.as_bytes());
2600            let digest = hasher.finalize();
2601            let mut bytes = [0u8; 32];
2602            bytes.copy_from_slice(digest.as_bytes());
2603            bytes
2604        };
2605        let target_key = crate::dht::DhtKey::from_bytes(our_id_bytes);
2606
2607        // Create FIND_NODE message
2608        let find_node_msg = DhtMessage::FindNode {
2609            target: target_key,
2610            count: 20, // Request up to 20 closest nodes
2611        };
2612
2613        // Serialize the message
2614        let message_bytes = postcard::to_allocvec(&find_node_msg).map_err(|e| {
2615            P2PError::Network(NetworkError::ProtocolError(
2616                format!("Failed to serialize FIND_NODE message: {e}").into(),
2617            ))
2618        })?;
2619
2620        // Send the FIND_NODE request
2621        self.send_message(peer_id, "/dht/1.0.0", message_bytes)
2622            .await?;
2623
2624        // Note: Response handling is asynchronous through the message handler.
2625        // For now, we log the request and let the response handler populate
2626        // the routing table when it receives FindNodeReply.
2627        //
2628        // TODO: Implement request-response correlation with a timeout to get
2629        // actual discovered peer count. For now, return 0 to indicate we sent
2630        // the request but don't have immediate response data.
2631
2632        info!("Sent FIND_NODE request to {} for peer discovery", peer_id);
2633
2634        Ok(0) // Actual count would require awaiting the response
2635    }
2636
2637    /// Connect to bootstrap peers and perform initial peer discovery
2638    async fn connect_bootstrap_peers(&self) -> Result<()> {
2639        let mut bootstrap_contacts = Vec::new();
2640        let mut used_cache = false;
2641        let mut seen_addresses = std::collections::HashSet::new();
2642
2643        // CLI-provided bootstrap peers take priority - always include them first
2644        let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2645            self.config.bootstrap_peers_str.clone()
2646        } else {
2647            // Convert Multiaddr to strings
2648            self.config
2649                .bootstrap_peers
2650                .iter()
2651                .map(|addr| addr.to_string())
2652                .collect::<Vec<_>>()
2653        };
2654
2655        if !cli_bootstrap_peers.is_empty() {
2656            info!(
2657                "Using {} CLI-provided bootstrap peers (priority)",
2658                cli_bootstrap_peers.len()
2659            );
2660            for addr in &cli_bootstrap_peers {
2661                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2662                    seen_addresses.insert(socket_addr);
2663                    let contact = ContactEntry::new(
2664                        format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2665                        vec![socket_addr],
2666                    );
2667                    bootstrap_contacts.push(contact);
2668                } else {
2669                    warn!("Invalid bootstrap address format: {}", addr);
2670                }
2671            }
2672        }
2673
2674        // Supplement with cached bootstrap peers (after CLI peers)
2675        // Use QUIC-specific peer selection since we're using ant-quic transport
2676        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2677            let manager = bootstrap_manager.read().await;
2678            match manager.get_quic_bootstrap_peers(20).await {
2679                // Try to get top 20 quality QUIC-enabled peers
2680                Ok(contacts) => {
2681                    if !contacts.is_empty() {
2682                        let mut added_from_cache = 0;
2683                        for contact in contacts {
2684                            // Only add if we haven't already added this address from CLI
2685                            let new_addresses: Vec<_> = contact
2686                                .addresses
2687                                .iter()
2688                                .filter(|addr| !seen_addresses.contains(addr))
2689                                .copied()
2690                                .collect();
2691
2692                            if !new_addresses.is_empty() {
2693                                for addr in &new_addresses {
2694                                    seen_addresses.insert(*addr);
2695                                }
2696                                let mut contact = contact.clone();
2697                                contact.addresses = new_addresses;
2698                                bootstrap_contacts.push(contact);
2699                                added_from_cache += 1;
2700                            }
2701                        }
2702                        if added_from_cache > 0 {
2703                            info!(
2704                                "Added {} cached bootstrap peers (supplementing CLI peers)",
2705                                added_from_cache
2706                            );
2707                            used_cache = true;
2708                        }
2709                    }
2710                }
2711                Err(e) => {
2712                    warn!("Failed to get cached bootstrap peers: {}", e);
2713                }
2714            }
2715        }
2716
2717        if bootstrap_contacts.is_empty() {
2718            info!("No bootstrap peers configured and no cached peers available");
2719            return Ok(());
2720        }
2721
2722        // Connect to bootstrap peers and perform peer discovery
2723        let mut successful_connections = 0;
2724        let mut connected_peer_ids: Vec<PeerId> = Vec::new();
2725
2726        for contact in bootstrap_contacts {
2727            for addr in &contact.addresses {
2728                match self.connect_peer(&addr.to_string()).await {
2729                    Ok(peer_id) => {
2730                        info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2731                        successful_connections += 1;
2732                        connected_peer_ids.push(peer_id.clone());
2733
2734                        // Update bootstrap cache with successful connection
2735                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2736                            let manager = bootstrap_manager.write().await;
2737                            let mut updated_contact = contact.clone();
2738                            updated_contact.peer_id = peer_id.clone();
2739                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
2740
2741                            if let Err(e) = manager.add_contact(updated_contact).await {
2742                                warn!("Failed to update bootstrap cache: {}", e);
2743                            }
2744                        }
2745                        break; // Successfully connected, move to next contact
2746                    }
2747                    Err(e) => {
2748                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2749
2750                        // Update bootstrap cache with failed connection
2751                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2752                            let manager = bootstrap_manager.write().await;
2753                            let mut updated_contact = contact.clone();
2754                            updated_contact.update_connection_result(
2755                                false,
2756                                None,
2757                                Some(e.to_string()),
2758                            );
2759
2760                            if let Err(e) = manager.add_contact(updated_contact).await {
2761                                warn!("Failed to update bootstrap cache: {}", e);
2762                            }
2763                        }
2764                    }
2765                }
2766            }
2767        }
2768
2769        if successful_connections == 0 {
2770            if !used_cache {
2771                warn!("Failed to connect to any bootstrap peers");
2772            }
2773            // Starting a node should not be gated on immediate bootstrap connectivity.
2774            // Keep running and allow background discovery / retries to populate peers later.
2775            return Ok(());
2776        }
2777
2778        info!(
2779            "Successfully connected to {} bootstrap peers",
2780            successful_connections
2781        );
2782
2783        // Perform peer discovery from connected bootstrap peers
2784        // Send FIND_NODE(self) to discover nearby peers and populate routing table
2785        for peer_id in &connected_peer_ids {
2786            match self.discover_peers_from(peer_id).await {
2787                Ok(_) => {
2788                    info!("Peer discovery initiated from bootstrap peer: {}", peer_id);
2789                }
2790                Err(e) => {
2791                    warn!("Failed to discover peers from {}: {}", peer_id, e);
2792                }
2793            }
2794        }
2795
2796        // Mark node as bootstrapped - we have connected to bootstrap peers
2797        // and initiated peer discovery
2798        self.is_bootstrapped.store(true, Ordering::SeqCst);
2799        info!(
2800            "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
2801            successful_connections,
2802            connected_peer_ids.len()
2803        );
2804
2805        Ok(())
2806    }
2807
2808    /// Disconnect from all peers
2809    async fn disconnect_all_peers(&self) -> Result<()> {
2810        let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2811
2812        for peer_id in peer_ids {
2813            self.disconnect_peer(&peer_id).await?;
2814        }
2815
2816        Ok(())
2817    }
2818
2819    /// Perform periodic maintenance tasks
2820    ///
2821    /// This function is called periodically (every 100ms) to:
2822    /// 1. Detect and remove stale peers (no activity for configured threshold)
2823    /// 2. Clean up disconnected peers from tracking structures
2824    /// 3. Emit PeerDisconnected events for removed peers
2825    async fn periodic_tasks(&self) -> Result<()> {
2826        // Stale peer threshold from config (default 60s, can be reduced for tests)
2827        let stale_threshold = self.config.stale_peer_threshold;
2828        // Cleanup threshold - disconnected peers are removed after 2x the stale threshold
2829        let cleanup_threshold = stale_threshold * 2;
2830
2831        let now = Instant::now();
2832        let mut peers_to_remove: Vec<PeerId> = Vec::new();
2833        let mut peers_to_mark_disconnected: Vec<PeerId> = Vec::new();
2834
2835        // Phase 1: Identify stale and disconnected peers
2836        {
2837            let peers = self.peers.read().await;
2838            for (peer_id, peer_info) in peers.iter() {
2839                let elapsed = now.duration_since(peer_info.last_seen);
2840
2841                match &peer_info.status {
2842                    ConnectionStatus::Connected => {
2843                        // Check if connected peer has gone stale
2844                        if elapsed > stale_threshold {
2845                            debug!(
2846                                peer_id = %peer_id,
2847                                elapsed_secs = elapsed.as_secs(),
2848                                "Peer went stale - marking for disconnection"
2849                            );
2850                            peers_to_mark_disconnected.push(peer_id.clone());
2851                        }
2852                    }
2853                    ConnectionStatus::Disconnected | ConnectionStatus::Failed(_) => {
2854                        // Remove disconnected/failed peers after cleanup threshold
2855                        if elapsed > cleanup_threshold {
2856                            trace!(
2857                                peer_id = %peer_id,
2858                                elapsed_secs = elapsed.as_secs(),
2859                                "Removing disconnected peer from tracking"
2860                            );
2861                            peers_to_remove.push(peer_id.clone());
2862                        }
2863                    }
2864                    ConnectionStatus::Connecting | ConnectionStatus::Disconnecting => {
2865                        // Transitional states - check for timeout
2866                        if elapsed > stale_threshold {
2867                            debug!(
2868                                peer_id = %peer_id,
2869                                status = ?peer_info.status,
2870                                "Connection timed out in transitional state"
2871                            );
2872                            peers_to_mark_disconnected.push(peer_id.clone());
2873                        }
2874                    }
2875                }
2876            }
2877        }
2878
2879        // Phase 2: Mark stale peers as disconnected
2880        if !peers_to_mark_disconnected.is_empty() {
2881            let mut peers = self.peers.write().await;
2882            for peer_id in &peers_to_mark_disconnected {
2883                if let Some(peer_info) = peers.get_mut(peer_id) {
2884                    peer_info.status = ConnectionStatus::Disconnected;
2885                    // Preserve last_seen timestamp for cleanup logic
2886                }
2887            }
2888        }
2889
2890        // Phase 3: Remove from active_connections and emit events for disconnected peers
2891        for peer_id in &peers_to_mark_disconnected {
2892            // Remove from active connections set
2893            self.active_connections.write().await.remove(peer_id);
2894
2895            // Broadcast disconnection event
2896            let _ = self
2897                .event_tx
2898                .send(P2PEvent::PeerDisconnected(peer_id.clone()));
2899
2900            info!(
2901                peer_id = %peer_id,
2902                "Stale peer disconnected"
2903            );
2904        }
2905
2906        // Phase 4: Remove fully cleaned up peers from tracking
2907        if !peers_to_remove.is_empty() {
2908            let mut peers = self.peers.write().await;
2909            for peer_id in &peers_to_remove {
2910                peers.remove(peer_id);
2911                trace!(peer_id = %peer_id, "Peer removed from tracking");
2912            }
2913        }
2914
2915        Ok(())
2916    }
2917}
2918
2919/// Network sender trait for sending messages
2920#[async_trait::async_trait]
2921pub trait NetworkSender: Send + Sync {
2922    /// Send a message to a specific peer
2923    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2924
2925    /// Get our local peer ID
2926    fn local_peer_id(&self) -> &PeerId;
2927}
2928
2929/// Lightweight wrapper for P2PNode to implement NetworkSender
2930#[derive(Clone)]
2931pub struct P2PNetworkSender {
2932    peer_id: PeerId,
2933    // Use channels for async communication with the P2P node
2934    send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2935}
2936
2937impl P2PNetworkSender {
2938    pub fn new(
2939        peer_id: PeerId,
2940        send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2941    ) -> Self {
2942        Self { peer_id, send_tx }
2943    }
2944}
2945
2946/// Implementation of NetworkSender trait for P2PNetworkSender
2947#[async_trait::async_trait]
2948impl NetworkSender for P2PNetworkSender {
2949    /// Send a message to a specific peer via the P2P network
2950    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2951        self.send_tx
2952            .send((peer_id.clone(), protocol.to_string(), data))
2953            .map_err(|_| {
2954                P2PError::Network(crate::error::NetworkError::ProtocolError(
2955                    "Failed to send message via channel".to_string().into(),
2956                ))
2957            })?;
2958        Ok(())
2959    }
2960
2961    /// Get our local peer ID
2962    fn local_peer_id(&self) -> &PeerId {
2963        &self.peer_id
2964    }
2965}
2966
2967/// Builder pattern for creating P2P nodes
2968pub struct NodeBuilder {
2969    config: NodeConfig,
2970}
2971
2972impl Default for NodeBuilder {
2973    fn default() -> Self {
2974        Self::new()
2975    }
2976}
2977
2978impl NodeBuilder {
2979    /// Create a new node builder
2980    pub fn new() -> Self {
2981        Self {
2982            config: NodeConfig::default(),
2983        }
2984    }
2985
2986    /// Set the peer ID
2987    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2988        self.config.peer_id = Some(peer_id);
2989        self
2990    }
2991
2992    /// Add a listen address
2993    pub fn listen_on(mut self, addr: &str) -> Self {
2994        if let Ok(multiaddr) = addr.parse() {
2995            self.config.listen_addrs.push(multiaddr);
2996        }
2997        self
2998    }
2999
3000    /// Add a bootstrap peer
3001    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
3002        if let Ok(multiaddr) = addr.parse() {
3003            self.config.bootstrap_peers.push(multiaddr);
3004        }
3005        self.config.bootstrap_peers_str.push(addr.to_string());
3006        self
3007    }
3008
3009    /// Enable IPv6 support
3010    pub fn with_ipv6(mut self, enable: bool) -> Self {
3011        self.config.enable_ipv6 = enable;
3012        self
3013    }
3014
3015    // MCP removed: builder methods deleted
3016
3017    /// Set connection timeout
3018    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
3019        self.config.connection_timeout = timeout;
3020        self
3021    }
3022
3023    /// Set maximum connections
3024    pub fn with_max_connections(mut self, max: usize) -> Self {
3025        self.config.max_connections = max;
3026        self
3027    }
3028
3029    /// Enable production mode with default configuration
3030    pub fn with_production_mode(mut self) -> Self {
3031        self.config.production_config = Some(ProductionConfig::default());
3032        self
3033    }
3034
3035    /// Configure production settings
3036    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
3037        self.config.production_config = Some(production_config);
3038        self
3039    }
3040
3041    /// Configure IP diversity limits for Sybil protection.
3042    pub fn with_diversity_config(
3043        mut self,
3044        diversity_config: crate::security::IPDiversityConfig,
3045    ) -> Self {
3046        self.config.diversity_config = Some(diversity_config);
3047        self
3048    }
3049
3050    /// Configure DHT settings
3051    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
3052        self.config.dht_config = dht_config;
3053        self
3054    }
3055
3056    /// Enable DHT with default configuration
3057    pub fn with_default_dht(mut self) -> Self {
3058        self.config.dht_config = DHTConfig::default();
3059        self
3060    }
3061
3062    /// Build the P2P node
3063    pub async fn build(self) -> Result<P2PNode> {
3064        P2PNode::new(self.config).await
3065    }
3066}
3067
3068#[cfg(test)]
3069#[allow(clippy::unwrap_used, clippy::expect_used)]
3070mod diversity_tests {
3071    use super::*;
3072    use crate::security::IPDiversityConfig;
3073
3074    async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
3075        let diversity_config = config.diversity_config.clone().unwrap_or_default();
3076        // Use a temp dir to avoid conflicts with cached files from old format
3077        let temp_dir = tempfile::TempDir::new().expect("temp dir");
3078        let mut cache_config = config.bootstrap_cache_config.clone().unwrap_or_default();
3079        cache_config.cache_dir = temp_dir.path().to_path_buf();
3080
3081        BootstrapManager::with_full_config(
3082            cache_config,
3083            crate::rate_limit::JoinRateLimiterConfig::default(),
3084            diversity_config,
3085        )
3086        .await
3087        .expect("bootstrap manager")
3088    }
3089
3090    #[tokio::test]
3091    async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
3092        let config = NodeConfig {
3093            diversity_config: Some(IPDiversityConfig::testnet()),
3094            ..Default::default()
3095        };
3096
3097        let manager = build_bootstrap_manager_like_prod(&config).await;
3098        assert!(manager.diversity_config().is_relaxed());
3099        assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
3100    }
3101}
3102
3103/// Helper function to register a new peer
3104async fn register_new_peer(
3105    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
3106    peer_id: &PeerId,
3107    remote_addr: &NetworkAddress,
3108) {
3109    let mut peers_guard = peers.write().await;
3110    let peer_info = PeerInfo {
3111        peer_id: peer_id.clone(),
3112        addresses: vec![remote_addr.to_string()],
3113        connected_at: tokio::time::Instant::now(),
3114        last_seen: tokio::time::Instant::now(),
3115        status: ConnectionStatus::Connected,
3116        protocols: vec!["p2p-core/1.0.0".to_string()],
3117        heartbeat_count: 0,
3118    };
3119    peers_guard.insert(peer_id.clone(), peer_info);
3120}
3121
3122#[cfg(test)]
3123mod tests {
3124    use super::*;
3125    // MCP removed from tests
3126    use std::time::Duration;
3127    use tokio::time::timeout;
3128
3129    // Test tool handler for network tests
3130
3131    // MCP removed
3132
3133    /// Helper function to create a test node configuration
3134    fn create_test_node_config() -> NodeConfig {
3135        NodeConfig {
3136            peer_id: Some("test_peer_123".to_string()),
3137            listen_addrs: vec![
3138                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
3139                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
3140            ],
3141            listen_addr: std::net::SocketAddr::new(
3142                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3143                0,
3144            ),
3145            bootstrap_peers: vec![],
3146            bootstrap_peers_str: vec![],
3147            enable_ipv6: true,
3148
3149            connection_timeout: Duration::from_secs(2),
3150            keep_alive_interval: Duration::from_secs(30),
3151            max_connections: 100,
3152            max_incoming_connections: 50,
3153            dht_config: DHTConfig::default(),
3154            security_config: SecurityConfig::default(),
3155            production_config: None,
3156            bootstrap_cache_config: None,
3157            diversity_config: None,
3158            stale_peer_threshold: default_stale_peer_threshold(),
3159        }
3160    }
3161
3162    /// Helper function to create a test tool
3163    // MCP removed: test tool helper deleted
3164
3165    #[tokio::test]
3166    async fn test_node_config_default() {
3167        let config = NodeConfig::default();
3168
3169        assert!(config.peer_id.is_none());
3170        assert_eq!(config.listen_addrs.len(), 2);
3171        assert!(config.enable_ipv6);
3172        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
3173        assert_eq!(config.max_incoming_connections, 100);
3174        assert_eq!(config.connection_timeout, Duration::from_secs(30));
3175    }
3176
3177    #[tokio::test]
3178    async fn test_dht_config_default() {
3179        let config = DHTConfig::default();
3180
3181        assert_eq!(config.k_value, 20);
3182        assert_eq!(config.alpha_value, 5);
3183        assert_eq!(config.record_ttl, Duration::from_secs(3600));
3184        assert_eq!(config.refresh_interval, Duration::from_secs(600));
3185    }
3186
3187    #[tokio::test]
3188    async fn test_security_config_default() {
3189        let config = SecurityConfig::default();
3190
3191        assert!(config.enable_noise);
3192        assert!(config.enable_tls);
3193        assert_eq!(config.trust_level, TrustLevel::Basic);
3194    }
3195
3196    #[test]
3197    fn test_trust_level_variants() {
3198        // Test that all trust level variants can be created
3199        let _none = TrustLevel::None;
3200        let _basic = TrustLevel::Basic;
3201        let _full = TrustLevel::Full;
3202
3203        // Test equality
3204        assert_eq!(TrustLevel::None, TrustLevel::None);
3205        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
3206        assert_eq!(TrustLevel::Full, TrustLevel::Full);
3207        assert_ne!(TrustLevel::None, TrustLevel::Basic);
3208    }
3209
3210    #[test]
3211    fn test_connection_status_variants() {
3212        let connecting = ConnectionStatus::Connecting;
3213        let connected = ConnectionStatus::Connected;
3214        let disconnecting = ConnectionStatus::Disconnecting;
3215        let disconnected = ConnectionStatus::Disconnected;
3216        let failed = ConnectionStatus::Failed("test error".to_string());
3217
3218        assert_eq!(connecting, ConnectionStatus::Connecting);
3219        assert_eq!(connected, ConnectionStatus::Connected);
3220        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
3221        assert_eq!(disconnected, ConnectionStatus::Disconnected);
3222        assert_ne!(connecting, connected);
3223
3224        if let ConnectionStatus::Failed(msg) = failed {
3225            assert_eq!(msg, "test error");
3226        } else {
3227            panic!("Expected Failed status");
3228        }
3229    }
3230
3231    #[tokio::test]
3232    async fn test_node_creation() -> Result<()> {
3233        let config = create_test_node_config();
3234        let node = P2PNode::new(config).await?;
3235
3236        assert_eq!(node.peer_id(), "test_peer_123");
3237        assert!(!node.is_running().await);
3238        assert_eq!(node.peer_count().await, 0);
3239        assert!(node.connected_peers().await.is_empty());
3240
3241        Ok(())
3242    }
3243
3244    #[tokio::test]
3245    async fn test_node_creation_without_peer_id() -> Result<()> {
3246        let mut config = create_test_node_config();
3247        config.peer_id = None;
3248
3249        let node = P2PNode::new(config).await?;
3250
3251        // Should have generated a peer ID
3252        assert!(node.peer_id().starts_with("peer_"));
3253        assert!(!node.is_running().await);
3254
3255        Ok(())
3256    }
3257
3258    #[tokio::test]
3259    async fn test_node_lifecycle() -> Result<()> {
3260        let config = create_test_node_config();
3261        let node = P2PNode::new(config).await?;
3262
3263        // Initially not running
3264        assert!(!node.is_running().await);
3265
3266        // Start the node
3267        node.start().await?;
3268        assert!(node.is_running().await);
3269
3270        // Check listen addresses were set (at least one)
3271        let listen_addrs = node.listen_addrs().await;
3272        assert!(
3273            !listen_addrs.is_empty(),
3274            "Expected at least one listening address"
3275        );
3276
3277        // Stop the node
3278        node.stop().await?;
3279        assert!(!node.is_running().await);
3280
3281        Ok(())
3282    }
3283
3284    #[tokio::test]
3285    async fn test_peer_connection() -> Result<()> {
3286        let config1 = create_test_node_config();
3287        let mut config2 = create_test_node_config();
3288        config2.peer_id = Some("test_peer_456".to_string());
3289
3290        let node1 = P2PNode::new(config1).await?;
3291        let node2 = P2PNode::new(config2).await?;
3292
3293        node1.start().await?;
3294        node2.start().await?;
3295
3296        let node2_addr = node2
3297            .listen_addrs()
3298            .await
3299            .into_iter()
3300            .find(|a| a.ip().is_ipv4())
3301            .ok_or_else(|| {
3302                P2PError::Network(crate::error::NetworkError::InvalidAddress(
3303                    "Node 2 did not expose an IPv4 listen address".into(),
3304                ))
3305            })?;
3306
3307        // Connect to a real peer
3308        let peer_id = node1.connect_peer(&node2_addr.to_string()).await?;
3309
3310        // Check peer count
3311        assert_eq!(node1.peer_count().await, 1);
3312
3313        // Check connected peers
3314        let connected_peers = node1.connected_peers().await;
3315        assert_eq!(connected_peers.len(), 1);
3316        assert_eq!(connected_peers[0], peer_id);
3317
3318        // Get peer info
3319        let peer_info = node1.peer_info(&peer_id).await;
3320        assert!(peer_info.is_some());
3321        let info = peer_info.expect("Peer info should exist after adding peer");
3322        assert_eq!(info.peer_id, peer_id);
3323        assert_eq!(info.status, ConnectionStatus::Connected);
3324        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3325
3326        // Disconnect from peer
3327        node1.disconnect_peer(&peer_id).await?;
3328        assert_eq!(node1.peer_count().await, 0);
3329
3330        node1.stop().await?;
3331        node2.stop().await?;
3332
3333        Ok(())
3334    }
3335
3336    // TODO(windows): Investigate QUIC connection issues on Windows CI
3337    // This test consistently fails on Windows GitHub Actions runners with
3338    // "All connect attempts failed" even with IPv4-only config, long delays,
3339    // and multiple retry attempts. The underlying ant-quic library may have
3340    // issues on Windows that need investigation.
3341    // See: https://github.com/dirvine/saorsa-core/issues/TBD
3342    #[cfg_attr(target_os = "windows", ignore)]
3343    #[tokio::test]
3344    async fn test_event_subscription() -> Result<()> {
3345        // Configure both nodes to use only IPv4 for reliable cross-platform testing
3346        // This is important because:
3347        // 1. local_addr() returns the first address from listen_addrs
3348        // 2. The default config puts IPv6 first, which may not work on all Windows setups
3349        let ipv4_localhost =
3350            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3351
3352        let mut config1 = create_test_node_config();
3353        config1.listen_addr = ipv4_localhost;
3354        config1.listen_addrs = vec![ipv4_localhost];
3355        config1.enable_ipv6 = false;
3356
3357        let mut config2 = create_test_node_config();
3358        config2.peer_id = Some("test_peer_456".to_string());
3359        config2.listen_addr = ipv4_localhost;
3360        config2.listen_addrs = vec![ipv4_localhost];
3361        config2.enable_ipv6 = false;
3362
3363        let node1 = P2PNode::new(config1).await?;
3364        let node2 = P2PNode::new(config2).await?;
3365
3366        node1.start().await?;
3367        node2.start().await?;
3368
3369        // Wait for nodes to fully bind their listening sockets
3370        // Windows network stack initialization can be significantly slower
3371        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
3372
3373        let mut events = node1.subscribe_events();
3374
3375        // Get the actual listening address using local_addr() for reliability
3376        let node2_addr = node2.local_addr().ok_or_else(|| {
3377            P2PError::Network(crate::error::NetworkError::ProtocolError(
3378                "No listening address".to_string().into(),
3379            ))
3380        })?;
3381
3382        // Connect to a peer with retry logic for Windows reliability
3383        // The QUIC library may need additional time to fully initialize
3384        let mut peer_id = None;
3385        for attempt in 0..3 {
3386            if attempt > 0 {
3387                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3388            }
3389            match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
3390                Ok(Ok(id)) => {
3391                    peer_id = Some(id);
3392                    break;
3393                }
3394                Ok(Err(_)) | Err(_) => continue,
3395            }
3396        }
3397        let peer_id = peer_id.ok_or_else(|| {
3398            P2PError::Network(crate::error::NetworkError::ProtocolError(
3399                "Failed to connect after 3 attempts".to_string().into(),
3400            ))
3401        })?;
3402
3403        // Check for PeerConnected event
3404        let event = timeout(Duration::from_secs(2), events.recv()).await;
3405        assert!(event.is_ok());
3406
3407        let event_result = event
3408            .expect("Should receive event")
3409            .expect("Event should not be error");
3410        match event_result {
3411            P2PEvent::PeerConnected(event_peer_id) => {
3412                assert_eq!(event_peer_id, peer_id);
3413            }
3414            _ => panic!("Expected PeerConnected event"),
3415        }
3416
3417        // Disconnect from peer (this should emit another event)
3418        node1.disconnect_peer(&peer_id).await?;
3419
3420        // Check for PeerDisconnected event
3421        let event = timeout(Duration::from_secs(2), events.recv()).await;
3422        assert!(event.is_ok());
3423
3424        let event_result = event
3425            .expect("Should receive event")
3426            .expect("Event should not be error");
3427        match event_result {
3428            P2PEvent::PeerDisconnected(event_peer_id) => {
3429                assert_eq!(event_peer_id, peer_id);
3430            }
3431            _ => panic!("Expected PeerDisconnected event"),
3432        }
3433
3434        node1.stop().await?;
3435        node2.stop().await?;
3436
3437        Ok(())
3438    }
3439
3440    // TODO(windows): Same QUIC connection issues as test_event_subscription
3441    #[cfg_attr(target_os = "windows", ignore)]
3442    #[tokio::test]
3443    async fn test_message_sending() -> Result<()> {
3444        // Create two nodes
3445        let mut config1 = create_test_node_config();
3446        config1.listen_addr =
3447            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3448        let node1 = P2PNode::new(config1).await?;
3449        node1.start().await?;
3450
3451        let mut config2 = create_test_node_config();
3452        config2.listen_addr =
3453            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3454        let node2 = P2PNode::new(config2).await?;
3455        node2.start().await?;
3456
3457        // Wait a bit for nodes to start listening
3458        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
3459
3460        // Get actual listening address of node2
3461        let node2_addr = node2.local_addr().ok_or_else(|| {
3462            P2PError::Network(crate::error::NetworkError::ProtocolError(
3463                "No listening address".to_string().into(),
3464            ))
3465        })?;
3466
3467        // Connect node1 to node2
3468        let peer_id =
3469            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
3470                Ok(res) => res?,
3471                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3472            };
3473
3474        // Wait a bit for connection to establish
3475        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
3476
3477        // Send a message
3478        let message_data = b"Hello, peer!".to_vec();
3479        let result = match timeout(
3480            Duration::from_millis(500),
3481            node1.send_message(&peer_id, "test-protocol", message_data),
3482        )
3483        .await
3484        {
3485            Ok(res) => res,
3486            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3487        };
3488        // For now, we'll just check that we don't get a "not connected" error
3489        // The actual send might fail due to no handler on the other side
3490        if let Err(e) = &result {
3491            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3492        }
3493
3494        // Try to send to non-existent peer
3495        let non_existent_peer = "non_existent_peer".to_string();
3496        let result = node1
3497            .send_message(&non_existent_peer, "test-protocol", vec![])
3498            .await;
3499        assert!(result.is_err(), "Sending to non-existent peer should fail");
3500
3501        Ok(())
3502    }
3503
3504    #[tokio::test]
3505    async fn test_remote_mcp_operations() -> Result<()> {
3506        let config = create_test_node_config();
3507        let node = P2PNode::new(config).await?;
3508
3509        // MCP removed; test reduced to simple start/stop
3510        node.start().await?;
3511        node.stop().await?;
3512        Ok(())
3513    }
3514
3515    #[tokio::test]
3516    async fn test_health_check() -> Result<()> {
3517        let config = create_test_node_config();
3518        let node = P2PNode::new(config).await?;
3519
3520        // Health check should pass with no connections
3521        let result = node.health_check().await;
3522        assert!(result.is_ok());
3523
3524        // Note: We're not actually connecting to real peers here
3525        // since that would require running bootstrap nodes.
3526        // The health check should still pass with no connections.
3527
3528        Ok(())
3529    }
3530
3531    #[tokio::test]
3532    async fn test_node_uptime() -> Result<()> {
3533        let config = create_test_node_config();
3534        let node = P2PNode::new(config).await?;
3535
3536        let uptime1 = node.uptime();
3537        assert!(uptime1 >= Duration::from_secs(0));
3538
3539        // Wait a bit
3540        tokio::time::sleep(Duration::from_millis(10)).await;
3541
3542        let uptime2 = node.uptime();
3543        assert!(uptime2 > uptime1);
3544
3545        Ok(())
3546    }
3547
3548    #[tokio::test]
3549    async fn test_node_config_access() -> Result<()> {
3550        let config = create_test_node_config();
3551        let expected_peer_id = config.peer_id.clone();
3552        let node = P2PNode::new(config).await?;
3553
3554        let node_config = node.config();
3555        assert_eq!(node_config.peer_id, expected_peer_id);
3556        assert_eq!(node_config.max_connections, 100);
3557        // MCP removed
3558
3559        Ok(())
3560    }
3561
3562    #[tokio::test]
3563    async fn test_mcp_server_access() -> Result<()> {
3564        let config = create_test_node_config();
3565        let _node = P2PNode::new(config).await?;
3566
3567        // MCP removed
3568        Ok(())
3569    }
3570
3571    #[tokio::test]
3572    async fn test_dht_access() -> Result<()> {
3573        let config = create_test_node_config();
3574        let node = P2PNode::new(config).await?;
3575
3576        // Should have DHT
3577        assert!(node.dht().is_some());
3578
3579        Ok(())
3580    }
3581
3582    #[tokio::test]
3583    async fn test_node_builder() -> Result<()> {
3584        // Create a config using the builder but don't actually build a real node
3585        let builder = P2PNode::builder()
3586            .with_peer_id("builder_test_peer".to_string())
3587            .listen_on("/ip4/127.0.0.1/tcp/0")
3588            .listen_on("/ip6/::1/tcp/0")
3589            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
3590            .with_ipv6(true)
3591            .with_connection_timeout(Duration::from_secs(15))
3592            .with_max_connections(200);
3593
3594        // Test the configuration that was built
3595        let config = builder.config;
3596        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3597        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
3598        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
3599        assert!(config.enable_ipv6);
3600        assert_eq!(config.connection_timeout, Duration::from_secs(15));
3601        assert_eq!(config.max_connections, 200);
3602
3603        Ok(())
3604    }
3605
3606    #[tokio::test]
3607    async fn test_bootstrap_peers() -> Result<()> {
3608        let mut config = create_test_node_config();
3609        config.bootstrap_peers = vec![
3610            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3611            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3612        ];
3613
3614        let node = P2PNode::new(config).await?;
3615
3616        // Start node (which attempts to connect to bootstrap peers)
3617        node.start().await?;
3618
3619        // In a test environment, bootstrap peers may not be available
3620        // The test verifies the node starts correctly with bootstrap configuration
3621        // Peer count may include local/internal tracking, so we just verify it's reasonable
3622        let _peer_count = node.peer_count().await;
3623
3624        node.stop().await?;
3625        Ok(())
3626    }
3627
3628    #[tokio::test]
3629    async fn test_production_mode_disabled() -> Result<()> {
3630        let config = create_test_node_config();
3631        let node = P2PNode::new(config).await?;
3632
3633        assert!(!node.is_production_mode());
3634        assert!(node.production_config().is_none());
3635
3636        // Resource metrics should fail when production mode is disabled
3637        let result = node.resource_metrics().await;
3638        assert!(result.is_err());
3639        assert!(result.unwrap_err().to_string().contains("not enabled"));
3640
3641        Ok(())
3642    }
3643
3644    #[tokio::test]
3645    async fn test_network_event_variants() {
3646        // Test that all network event variants can be created
3647        let peer_id = "test_peer".to_string();
3648        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3649
3650        let _peer_connected = NetworkEvent::PeerConnected {
3651            peer_id: peer_id.clone(),
3652            addresses: vec![address.clone()],
3653        };
3654
3655        let _peer_disconnected = NetworkEvent::PeerDisconnected {
3656            peer_id: peer_id.clone(),
3657            reason: "test disconnect".to_string(),
3658        };
3659
3660        let _message_received = NetworkEvent::MessageReceived {
3661            peer_id: peer_id.clone(),
3662            protocol: "test-protocol".to_string(),
3663            data: vec![1, 2, 3],
3664        };
3665
3666        let _connection_failed = NetworkEvent::ConnectionFailed {
3667            peer_id: Some(peer_id.clone()),
3668            address: address.clone(),
3669            error: "connection refused".to_string(),
3670        };
3671
3672        let _dht_stored = NetworkEvent::DHTRecordStored {
3673            key: vec![1, 2, 3],
3674            value: vec![4, 5, 6],
3675        };
3676
3677        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3678            key: vec![1, 2, 3],
3679            value: Some(vec![4, 5, 6]),
3680        };
3681    }
3682
3683    #[tokio::test]
3684    async fn test_peer_info_structure() {
3685        let peer_info = PeerInfo {
3686            peer_id: "test_peer".to_string(),
3687            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3688            connected_at: Instant::now(),
3689            last_seen: Instant::now(),
3690            status: ConnectionStatus::Connected,
3691            protocols: vec!["test-protocol".to_string()],
3692            heartbeat_count: 0,
3693        };
3694
3695        assert_eq!(peer_info.peer_id, "test_peer");
3696        assert_eq!(peer_info.addresses.len(), 1);
3697        assert_eq!(peer_info.status, ConnectionStatus::Connected);
3698        assert_eq!(peer_info.protocols.len(), 1);
3699    }
3700
3701    #[tokio::test]
3702    async fn test_serialization() -> Result<()> {
3703        // Test that configs can be serialized/deserialized
3704        let config = create_test_node_config();
3705        let serialized = serde_json::to_string(&config)?;
3706        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3707
3708        assert_eq!(config.peer_id, deserialized.peer_id);
3709        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3710        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3711
3712        Ok(())
3713    }
3714
3715    #[tokio::test]
3716    async fn test_get_peer_id_by_address_found() -> Result<()> {
3717        let config = create_test_node_config();
3718        let node = P2PNode::new(config).await?;
3719
3720        // Manually insert a peer for testing
3721        let test_peer_id = "peer_test_123".to_string();
3722        let test_address = "192.168.1.100:9000".to_string();
3723
3724        let peer_info = PeerInfo {
3725            peer_id: test_peer_id.clone(),
3726            addresses: vec![test_address.clone()],
3727            connected_at: Instant::now(),
3728            last_seen: Instant::now(),
3729            status: ConnectionStatus::Connected,
3730            protocols: vec!["test-protocol".to_string()],
3731            heartbeat_count: 0,
3732        };
3733
3734        node.peers
3735            .write()
3736            .await
3737            .insert(test_peer_id.clone(), peer_info);
3738
3739        // Test: Find peer by address
3740        let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3741        assert_eq!(found_peer_id, Some(test_peer_id));
3742
3743        Ok(())
3744    }
3745
3746    #[tokio::test]
3747    async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3748        let config = create_test_node_config();
3749        let node = P2PNode::new(config).await?;
3750
3751        // Test: Try to find a peer that doesn't exist
3752        let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3753        assert_eq!(result, None);
3754
3755        Ok(())
3756    }
3757
3758    #[tokio::test]
3759    async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3760        let config = create_test_node_config();
3761        let node = P2PNode::new(config).await?;
3762
3763        // Test: Invalid address format should return None
3764        let result = node.get_peer_id_by_address("invalid-address").await;
3765        assert_eq!(result, None);
3766
3767        Ok(())
3768    }
3769
3770    #[tokio::test]
3771    async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3772        let config = create_test_node_config();
3773        let node = P2PNode::new(config).await?;
3774
3775        // Add multiple peers with different addresses
3776        let peer1_id = "peer_1".to_string();
3777        let peer1_addr = "192.168.1.101:9001".to_string();
3778
3779        let peer2_id = "peer_2".to_string();
3780        let peer2_addr = "192.168.1.102:9002".to_string();
3781
3782        let peer1_info = PeerInfo {
3783            peer_id: peer1_id.clone(),
3784            addresses: vec![peer1_addr.clone()],
3785            connected_at: Instant::now(),
3786            last_seen: Instant::now(),
3787            status: ConnectionStatus::Connected,
3788            protocols: vec!["test-protocol".to_string()],
3789            heartbeat_count: 0,
3790        };
3791
3792        let peer2_info = PeerInfo {
3793            peer_id: peer2_id.clone(),
3794            addresses: vec![peer2_addr.clone()],
3795            connected_at: Instant::now(),
3796            last_seen: Instant::now(),
3797            status: ConnectionStatus::Connected,
3798            protocols: vec!["test-protocol".to_string()],
3799            heartbeat_count: 0,
3800        };
3801
3802        node.peers
3803            .write()
3804            .await
3805            .insert(peer1_id.clone(), peer1_info);
3806        node.peers
3807            .write()
3808            .await
3809            .insert(peer2_id.clone(), peer2_info);
3810
3811        // Test: Find each peer by their unique address
3812        let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3813        let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3814
3815        assert_eq!(found_peer1, Some(peer1_id));
3816        assert_eq!(found_peer2, Some(peer2_id));
3817
3818        Ok(())
3819    }
3820
3821    #[tokio::test]
3822    async fn test_list_active_connections_empty() -> Result<()> {
3823        let config = create_test_node_config();
3824        let node = P2PNode::new(config).await?;
3825
3826        // Test: No connections initially
3827        let connections = node.list_active_connections().await;
3828        assert!(connections.is_empty());
3829
3830        Ok(())
3831    }
3832
3833    #[tokio::test]
3834    async fn test_list_active_connections_with_peers() -> Result<()> {
3835        let config = create_test_node_config();
3836        let node = P2PNode::new(config).await?;
3837
3838        // Add multiple peers
3839        let peer1_id = "peer_1".to_string();
3840        let peer1_addrs = vec![
3841            "192.168.1.101:9001".to_string(),
3842            "192.168.1.101:9002".to_string(),
3843        ];
3844
3845        let peer2_id = "peer_2".to_string();
3846        let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3847
3848        let peer1_info = PeerInfo {
3849            peer_id: peer1_id.clone(),
3850            addresses: peer1_addrs.clone(),
3851            connected_at: Instant::now(),
3852            last_seen: Instant::now(),
3853            status: ConnectionStatus::Connected,
3854            protocols: vec!["test-protocol".to_string()],
3855            heartbeat_count: 0,
3856        };
3857
3858        let peer2_info = PeerInfo {
3859            peer_id: peer2_id.clone(),
3860            addresses: peer2_addrs.clone(),
3861            connected_at: Instant::now(),
3862            last_seen: Instant::now(),
3863            status: ConnectionStatus::Connected,
3864            protocols: vec!["test-protocol".to_string()],
3865            heartbeat_count: 0,
3866        };
3867
3868        node.peers
3869            .write()
3870            .await
3871            .insert(peer1_id.clone(), peer1_info);
3872        node.peers
3873            .write()
3874            .await
3875            .insert(peer2_id.clone(), peer2_info);
3876
3877        // Also add to active_connections (list_active_connections iterates over this)
3878        node.active_connections
3879            .write()
3880            .await
3881            .insert(peer1_id.clone());
3882        node.active_connections
3883            .write()
3884            .await
3885            .insert(peer2_id.clone());
3886
3887        // Test: List all active connections
3888        let connections = node.list_active_connections().await;
3889        assert_eq!(connections.len(), 2);
3890
3891        // Verify peer1 and peer2 are in the list
3892        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3893        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3894
3895        assert!(peer1_conn.is_some());
3896        assert!(peer2_conn.is_some());
3897
3898        // Verify addresses match
3899        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3900        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3901
3902        Ok(())
3903    }
3904
3905    #[tokio::test]
3906    async fn test_remove_peer_success() -> Result<()> {
3907        let config = create_test_node_config();
3908        let node = P2PNode::new(config).await?;
3909
3910        // Add a peer
3911        let peer_id = "peer_to_remove".to_string();
3912        let peer_info = PeerInfo {
3913            peer_id: peer_id.clone(),
3914            addresses: vec!["192.168.1.100:9000".to_string()],
3915            connected_at: Instant::now(),
3916            last_seen: Instant::now(),
3917            status: ConnectionStatus::Connected,
3918            protocols: vec!["test-protocol".to_string()],
3919            heartbeat_count: 0,
3920        };
3921
3922        node.peers.write().await.insert(peer_id.clone(), peer_info);
3923
3924        // Verify peer exists
3925        assert!(node.is_peer_connected(&peer_id).await);
3926
3927        // Remove the peer
3928        let removed = node.remove_peer(&peer_id).await;
3929        assert!(removed);
3930
3931        // Verify peer no longer exists
3932        assert!(!node.is_peer_connected(&peer_id).await);
3933
3934        Ok(())
3935    }
3936
3937    #[tokio::test]
3938    async fn test_remove_peer_nonexistent() -> Result<()> {
3939        let config = create_test_node_config();
3940        let node = P2PNode::new(config).await?;
3941
3942        // Try to remove a peer that doesn't exist
3943        let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3944        assert!(!removed);
3945
3946        Ok(())
3947    }
3948
3949    #[tokio::test]
3950    async fn test_is_peer_connected() -> Result<()> {
3951        let config = create_test_node_config();
3952        let node = P2PNode::new(config).await?;
3953
3954        let peer_id = "test_peer".to_string();
3955
3956        // Initially not connected
3957        assert!(!node.is_peer_connected(&peer_id).await);
3958
3959        // Add peer
3960        let peer_info = PeerInfo {
3961            peer_id: peer_id.clone(),
3962            addresses: vec!["192.168.1.100:9000".to_string()],
3963            connected_at: Instant::now(),
3964            last_seen: Instant::now(),
3965            status: ConnectionStatus::Connected,
3966            protocols: vec!["test-protocol".to_string()],
3967            heartbeat_count: 0,
3968        };
3969
3970        node.peers.write().await.insert(peer_id.clone(), peer_info);
3971
3972        // Now connected
3973        assert!(node.is_peer_connected(&peer_id).await);
3974
3975        // Remove peer
3976        node.remove_peer(&peer_id).await;
3977
3978        // No longer connected
3979        assert!(!node.is_peer_connected(&peer_id).await);
3980
3981        Ok(())
3982    }
3983
3984    #[test]
3985    fn test_normalize_ipv6_wildcard() {
3986        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3987
3988        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3989        let normalized = normalize_wildcard_to_loopback(wildcard);
3990
3991        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3992        assert_eq!(normalized.port(), 8080);
3993    }
3994
3995    #[test]
3996    fn test_normalize_ipv4_wildcard() {
3997        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3998
3999        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
4000        let normalized = normalize_wildcard_to_loopback(wildcard);
4001
4002        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
4003        assert_eq!(normalized.port(), 9000);
4004    }
4005
4006    #[test]
4007    fn test_normalize_specific_address_unchanged() {
4008        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
4009        let normalized = normalize_wildcard_to_loopback(specific);
4010
4011        assert_eq!(normalized, specific);
4012    }
4013
4014    #[test]
4015    fn test_normalize_loopback_unchanged() {
4016        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
4017
4018        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
4019        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
4020        assert_eq!(normalized_v6, loopback_v6);
4021
4022        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
4023        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
4024        assert_eq!(normalized_v4, loopback_v4);
4025    }
4026
4027    // ---- parse_protocol_message regression tests ----
4028
4029    /// Get current Unix timestamp for tests
4030    fn current_timestamp() -> u64 {
4031        std::time::SystemTime::now()
4032            .duration_since(std::time::UNIX_EPOCH)
4033            .map(|d| d.as_secs())
4034            .unwrap_or(0)
4035    }
4036
4037    /// Helper to create a postcard-serialized WireMessage for tests
4038    fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
4039        let msg = WireMessage {
4040            protocol: protocol.to_string(),
4041            data,
4042            from: from.to_string(),
4043            timestamp,
4044        };
4045        postcard::to_stdvec(&msg).unwrap()
4046    }
4047
4048    #[test]
4049    fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
4050        // Regression: P2PEvent::Message.source must be the transport peer ID,
4051        // NOT the "from" field from the wire message.  This ensures consumers
4052        // can pass source directly to send_message().
4053        let transport_id = "abcdef0123456789";
4054        let logical_id = "spoofed-logical-id";
4055        let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
4056
4057        let event =
4058            parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
4059
4060        match event {
4061            P2PEvent::Message {
4062                topic,
4063                source,
4064                data,
4065            } => {
4066                assert_eq!(source, transport_id, "source must be the transport peer ID");
4067                assert_ne!(
4068                    source, logical_id,
4069                    "source must NOT be the logical 'from' field"
4070                );
4071                assert_eq!(topic, "test/v1");
4072                assert_eq!(data, vec![1u8, 2, 3]);
4073            }
4074            other => panic!("expected P2PEvent::Message, got {:?}", other),
4075        }
4076    }
4077
4078    #[test]
4079    fn test_parse_protocol_message_rejects_invalid_bytes() {
4080        // Random bytes that are not valid bincode should be rejected
4081        assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
4082    }
4083
4084    #[test]
4085    fn test_parse_protocol_message_rejects_truncated_message() {
4086        // A truncated bincode message should fail to deserialize
4087        let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
4088        let truncated = &full_bytes[..full_bytes.len() / 2];
4089        assert!(parse_protocol_message(truncated, "peer-id").is_none());
4090    }
4091
4092    #[test]
4093    fn test_parse_protocol_message_empty_payload() {
4094        let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
4095
4096        let event = parse_protocol_message(&bytes, "transport-peer")
4097            .expect("valid message with empty data should parse");
4098
4099        match event {
4100            P2PEvent::Message { data, .. } => assert!(data.is_empty()),
4101            other => panic!("expected P2PEvent::Message, got {:?}", other),
4102        }
4103    }
4104
4105    #[test]
4106    fn test_parse_protocol_message_preserves_binary_payload() {
4107        // Verify that arbitrary byte values (including 0xFF, 0x00) survive round-trip
4108        let payload: Vec<u8> = (0..=255).collect();
4109        let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
4110
4111        let event = parse_protocol_message(&bytes, "peer-id")
4112            .expect("valid message with full byte range should parse");
4113
4114        match event {
4115            P2PEvent::Message { data, topic, .. } => {
4116                assert_eq!(topic, "binary/v1");
4117                assert_eq!(
4118                    data, payload,
4119                    "payload must survive bincode round-trip exactly"
4120                );
4121            }
4122            other => panic!("expected P2PEvent::Message, got {:?}", other),
4123        }
4124    }
4125}