Skip to main content

saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::adaptive::{
20    EigenTrustEngine, NodeId, NodeId as AdaptiveNodeId, NodeStatisticsUpdate, TrustProvider,
21};
22use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
23use crate::config::Config;
24use crate::dht_network_manager::{DhtNetworkConfig, DhtNetworkManager};
25use crate::error::{NetworkError, P2PError, P2pResult as Result, PeerFailureReason};
26
27use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
28use crate::{NetworkAddress, PeerId};
29use serde::{Deserialize, Serialize};
30use std::collections::{HashMap, HashSet};
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::time::Duration;
34use tokio::sync::{RwLock, broadcast};
35use tokio::time::Instant;
36use tokio_util::sync::CancellationToken;
37use tracing::{debug, info, warn};
38
39/// Wire protocol message format for P2P communication.
40///
41/// Serialized with bincode for compact binary encoding.
42/// Replaces the previous JSON format for better performance
43/// and smaller wire size.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub(crate) struct WireMessage {
46    /// Protocol/topic identifier
47    pub(crate) protocol: String,
48    /// Raw payload bytes
49    pub(crate) data: Vec<u8>,
50    /// Sender's peer ID (verified against transport-level identity)
51    pub(crate) from: String,
52    /// Unix timestamp in seconds
53    pub(crate) timestamp: u64,
54}
55
56/// Payload bytes used for keepalive messages to prevent connection timeouts.
57pub(crate) const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive";
58
59/// Capacity of the internal channel used by the message receiving system.
60pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
61
62/// Maximum number of concurrent in-flight request/response operations.
63pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
64
65/// Maximum allowed timeout for a single request (5 minutes).
66pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
67
68/// Default port when config parsing fails.
69const DEFAULT_LISTEN_PORT: u16 = 9000;
70
71/// DHT max XOR distance (full 160-bit keyspace).
72const DHT_MAX_DISTANCE: u8 = 160;
73
74/// Interval for the P2PNode run loop's maintenance tick.
75const RUN_LOOP_TICK_INTERVAL_MS: u64 = 100;
76
77/// Quality score for successful bootstrap connections.
78const BOOTSTRAP_QUALITY_SCORE_SUCCESS: f64 = 0.8;
79/// Quality score for failed bootstrap connections.
80const BOOTSTRAP_QUALITY_SCORE_FAILURE: f64 = 0.2;
81/// Default uptime score for new bootstrap contacts.
82const BOOTSTRAP_DEFAULT_UPTIME_SCORE: f64 = 0.5;
83/// Penalty duration for failed bootstrap connections.
84const BOOTSTRAP_FAILURE_PENALTY_HOURS: i64 = 1;
85
86/// Default neutral trust score when trust engine is unavailable.
87const DEFAULT_NEUTRAL_TRUST: f64 = 0.5;
88
89/// Number of cached bootstrap peers to retrieve.
90const BOOTSTRAP_PEER_BATCH_SIZE: usize = 20;
91
92/// Configuration for a P2P node
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct NodeConfig {
95    /// Local peer ID for this node
96    pub peer_id: Option<PeerId>,
97
98    /// Addresses to listen on for incoming connections
99    pub listen_addrs: Vec<std::net::SocketAddr>,
100
101    /// Primary listen address (for compatibility)
102    pub listen_addr: std::net::SocketAddr,
103
104    /// Bootstrap peers to connect to on startup (legacy)
105    pub bootstrap_peers: Vec<std::net::SocketAddr>,
106
107    /// Bootstrap peers as strings (for integration tests)
108    pub bootstrap_peers_str: Vec<String>,
109
110    /// Enable IPv6 support
111    pub enable_ipv6: bool,
112
113    // MCP removed; will be redesigned later
114    /// Connection timeout duration
115    pub connection_timeout: Duration,
116
117    /// Keep-alive interval for connections
118    pub keep_alive_interval: Duration,
119
120    /// Maximum number of concurrent connections
121    pub max_connections: usize,
122
123    /// Maximum number of incoming connections
124    pub max_incoming_connections: usize,
125
126    /// DHT configuration
127    pub dht_config: DHTConfig,
128
129    /// Security configuration
130    pub security_config: SecurityConfig,
131
132    /// Production hardening configuration
133    pub production_config: Option<ProductionConfig>,
134
135    /// Bootstrap cache configuration
136    pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
137
138    /// Optional IP diversity configuration for Sybil protection tuning.
139    ///
140    /// When set, this configuration is used by bootstrap peer discovery and
141    /// other diversity-enforcing subsystems. If `None`, defaults are used.
142    pub diversity_config: Option<crate::security::IPDiversityConfig>,
143
144    /// Stale peer threshold - peers with no activity for this duration are considered stale.
145    /// Defaults to 60 seconds. Can be reduced for testing purposes.
146    #[serde(default = "default_stale_peer_threshold")]
147    pub stale_peer_threshold: Duration,
148
149    /// Optional override for the maximum application-layer message size.
150    ///
151    /// When `None`, the transport crate default is used (1 MiB).
152    #[serde(default)]
153    pub max_message_size: Option<usize>,
154}
155
156/// Default stale peer threshold (60 seconds)
157fn default_stale_peer_threshold() -> Duration {
158    Duration::from_secs(60)
159}
160
161/// DHT-specific configuration
162#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct DHTConfig {
164    /// Kademlia K parameter (bucket size)
165    pub k_value: usize,
166
167    /// Kademlia alpha parameter (parallelism)
168    pub alpha_value: usize,
169
170    /// DHT record TTL
171    pub record_ttl: Duration,
172
173    /// DHT refresh interval
174    pub refresh_interval: Duration,
175}
176
177/// Security configuration
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct SecurityConfig {
180    /// Enable noise protocol for encryption
181    pub enable_noise: bool,
182
183    /// Enable TLS for secure transport
184    pub enable_tls: bool,
185
186    /// Trust level for peer verification
187    pub trust_level: TrustLevel,
188}
189
190/// Trust level for peer verification
191#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
192pub enum TrustLevel {
193    /// No verification required
194    None,
195    /// Basic peer ID verification
196    Basic,
197    /// Full cryptographic verification
198    Full,
199}
200
201// ============================================================================
202// Address Construction Helpers
203// ============================================================================
204
205/// Build listen addresses based on port and IPv6 preference
206///
207/// This helper consolidates the duplicated address construction logic.
208#[inline]
209fn build_listen_addrs(port: u16, ipv6_enabled: bool) -> Vec<std::net::SocketAddr> {
210    let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
211
212    if ipv6_enabled {
213        addrs.push(std::net::SocketAddr::new(
214            std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
215            port,
216        ));
217    }
218
219    addrs.push(std::net::SocketAddr::new(
220        std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
221        port,
222    ));
223
224    addrs
225}
226
227impl NodeConfig {
228    /// Create a new NodeConfig with default values
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if default addresses cannot be parsed
233    pub fn new() -> Result<Self> {
234        let config = Config::default();
235        let listen_addr = config.listen_socket_addr()?;
236
237        Ok(Self {
238            peer_id: None,
239            listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
240            listen_addr,
241            bootstrap_peers: Vec::new(),
242            bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
243            enable_ipv6: config.network.ipv6_enabled,
244            connection_timeout: Duration::from_secs(config.network.connection_timeout),
245            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
246            max_connections: config.network.max_connections,
247            max_incoming_connections: config.security.connection_limit as usize,
248            dht_config: DHTConfig::default(),
249            security_config: SecurityConfig::default(),
250            production_config: None,
251            bootstrap_cache_config: None,
252            diversity_config: None,
253            stale_peer_threshold: default_stale_peer_threshold(),
254            max_message_size: None,
255        })
256    }
257
258    /// Create a builder for customized NodeConfig construction
259    pub fn builder() -> NodeConfigBuilder {
260        NodeConfigBuilder::default()
261    }
262}
263
264// ============================================================================
265// NodeConfig Builder Pattern
266// ============================================================================
267
268/// Builder for constructing NodeConfig with fluent API
269#[derive(Debug, Clone, Default)]
270pub struct NodeConfigBuilder {
271    peer_id: Option<PeerId>,
272    listen_port: Option<u16>,
273    enable_ipv6: Option<bool>,
274    bootstrap_peers: Vec<std::net::SocketAddr>,
275    max_connections: Option<usize>,
276    connection_timeout: Option<Duration>,
277    keep_alive_interval: Option<Duration>,
278    dht_config: Option<DHTConfig>,
279    security_config: Option<SecurityConfig>,
280    production_config: Option<ProductionConfig>,
281}
282
283impl NodeConfigBuilder {
284    /// Set the peer ID
285    pub fn peer_id(mut self, peer_id: PeerId) -> Self {
286        self.peer_id = Some(peer_id);
287        self
288    }
289
290    /// Set the listen port
291    pub fn listen_port(mut self, port: u16) -> Self {
292        self.listen_port = Some(port);
293        self
294    }
295
296    /// Enable or disable IPv6
297    pub fn ipv6(mut self, enabled: bool) -> Self {
298        self.enable_ipv6 = Some(enabled);
299        self
300    }
301
302    /// Add a bootstrap peer
303    pub fn bootstrap_peer(mut self, addr: std::net::SocketAddr) -> Self {
304        self.bootstrap_peers.push(addr);
305        self
306    }
307
308    /// Set maximum connections
309    pub fn max_connections(mut self, max: usize) -> Self {
310        self.max_connections = Some(max);
311        self
312    }
313
314    /// Set connection timeout
315    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
316        self.connection_timeout = Some(timeout);
317        self
318    }
319
320    /// Set keep-alive interval
321    pub fn keep_alive_interval(mut self, interval: Duration) -> Self {
322        self.keep_alive_interval = Some(interval);
323        self
324    }
325
326    /// Set DHT configuration
327    pub fn dht_config(mut self, config: DHTConfig) -> Self {
328        self.dht_config = Some(config);
329        self
330    }
331
332    /// Set security configuration
333    pub fn security_config(mut self, config: SecurityConfig) -> Self {
334        self.security_config = Some(config);
335        self
336    }
337
338    /// Set production configuration
339    pub fn production_config(mut self, config: ProductionConfig) -> Self {
340        self.production_config = Some(config);
341        self
342    }
343
344    /// Build the NodeConfig
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if address construction fails
349    pub fn build(self) -> Result<NodeConfig> {
350        let base_config = Config::default();
351        let default_port = base_config
352            .listen_socket_addr()
353            .map(|addr| addr.port())
354            .unwrap_or(DEFAULT_LISTEN_PORT);
355        let port = self.listen_port.unwrap_or(default_port);
356        let ipv6_enabled = self.enable_ipv6.unwrap_or(base_config.network.ipv6_enabled);
357
358        let listen_addr =
359            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), port);
360
361        Ok(NodeConfig {
362            peer_id: self.peer_id,
363            listen_addrs: build_listen_addrs(port, ipv6_enabled),
364            listen_addr,
365            bootstrap_peers: self.bootstrap_peers.clone(),
366            bootstrap_peers_str: self.bootstrap_peers.iter().map(|a| a.to_string()).collect(),
367            enable_ipv6: ipv6_enabled,
368            connection_timeout: self
369                .connection_timeout
370                .unwrap_or(Duration::from_secs(base_config.network.connection_timeout)),
371            keep_alive_interval: self
372                .keep_alive_interval
373                .unwrap_or(Duration::from_secs(base_config.network.keepalive_interval)),
374            max_connections: self
375                .max_connections
376                .unwrap_or(base_config.network.max_connections),
377            max_incoming_connections: base_config.security.connection_limit as usize,
378            dht_config: self.dht_config.unwrap_or_default(),
379            security_config: self.security_config.unwrap_or_default(),
380            production_config: self.production_config,
381            bootstrap_cache_config: None,
382            diversity_config: None,
383            stale_peer_threshold: default_stale_peer_threshold(),
384            max_message_size: None,
385        })
386    }
387}
388
389impl Default for NodeConfig {
390    fn default() -> Self {
391        let config = Config::default();
392        let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
393            std::net::SocketAddr::new(
394                std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
395                DEFAULT_LISTEN_PORT,
396            )
397        });
398
399        Self {
400            peer_id: None,
401            listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
402            listen_addr,
403            bootstrap_peers: Vec::new(),
404            bootstrap_peers_str: Vec::new(),
405            enable_ipv6: config.network.ipv6_enabled,
406            connection_timeout: Duration::from_secs(config.network.connection_timeout),
407            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
408            max_connections: config.network.max_connections,
409            max_incoming_connections: config.security.connection_limit as usize,
410            dht_config: DHTConfig::default(),
411            security_config: SecurityConfig::default(),
412            production_config: None,
413            bootstrap_cache_config: None,
414            diversity_config: None,
415            stale_peer_threshold: default_stale_peer_threshold(),
416            max_message_size: None,
417        }
418    }
419}
420
421impl NodeConfig {
422    /// Create NodeConfig from Config
423    pub fn from_config(config: &Config) -> Result<Self> {
424        let listen_addr = config.listen_socket_addr()?;
425        let bootstrap_addrs = config.bootstrap_addrs()?;
426
427        let mut node_config = Self {
428            peer_id: None,
429            listen_addrs: vec![listen_addr],
430            listen_addr,
431            bootstrap_peers: bootstrap_addrs
432                .iter()
433                .map(|addr| addr.socket_addr())
434                .collect(),
435            bootstrap_peers_str: config
436                .network
437                .bootstrap_nodes
438                .iter()
439                .map(|addr| addr.to_string())
440                .collect(),
441            enable_ipv6: config.network.ipv6_enabled,
442
443            connection_timeout: Duration::from_secs(config.network.connection_timeout),
444            keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
445            max_connections: config.network.max_connections,
446            max_incoming_connections: config.security.connection_limit as usize,
447            dht_config: DHTConfig {
448                k_value: 20,
449                alpha_value: 3,
450                record_ttl: Duration::from_secs(3600),
451                refresh_interval: Duration::from_secs(900),
452            },
453            security_config: SecurityConfig {
454                enable_noise: true,
455                enable_tls: true,
456                trust_level: TrustLevel::Basic,
457            },
458            production_config: Some(ProductionConfig {
459                max_connections: config.network.max_connections,
460                max_memory_bytes: 0,  // unlimited
461                max_bandwidth_bps: 0, // unlimited
462                connection_timeout: Duration::from_secs(config.network.connection_timeout),
463                keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
464                health_check_interval: Duration::from_secs(30),
465                metrics_interval: Duration::from_secs(60),
466                enable_performance_tracking: true,
467                enable_auto_cleanup: true,
468                shutdown_timeout: Duration::from_secs(30),
469                rate_limits: crate::production::RateLimitConfig::default(),
470            }),
471            bootstrap_cache_config: None,
472            diversity_config: None,
473            stale_peer_threshold: default_stale_peer_threshold(),
474            max_message_size: None,
475        };
476
477        // Add IPv6 listen address if enabled
478        if config.network.ipv6_enabled {
479            node_config.listen_addrs.push(std::net::SocketAddr::new(
480                std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
481                listen_addr.port(),
482            ));
483        }
484
485        Ok(node_config)
486    }
487
488    /// Try to build a NodeConfig from a listen address string
489    pub fn with_listen_addr(addr: &str) -> Result<Self> {
490        let listen_addr: std::net::SocketAddr = addr
491            .parse()
492            .map_err(|e: std::net::AddrParseError| {
493                NetworkError::InvalidAddress(e.to_string().into())
494            })
495            .map_err(P2PError::Network)?;
496        let cfg = NodeConfig {
497            listen_addr,
498            listen_addrs: vec![listen_addr],
499            diversity_config: None,
500            ..Default::default()
501        };
502        Ok(cfg)
503    }
504}
505
506impl DHTConfig {
507    const DEFAULT_K_VALUE: usize = 20;
508    const DEFAULT_ALPHA_VALUE: usize = 5;
509    const DEFAULT_RECORD_TTL_SECS: u64 = 3600;
510    const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
511}
512
513impl Default for DHTConfig {
514    fn default() -> Self {
515        Self {
516            k_value: Self::DEFAULT_K_VALUE,
517            alpha_value: Self::DEFAULT_ALPHA_VALUE,
518            record_ttl: Duration::from_secs(Self::DEFAULT_RECORD_TTL_SECS),
519            refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
520        }
521    }
522}
523
524impl Default for SecurityConfig {
525    fn default() -> Self {
526        Self {
527            enable_noise: true,
528            enable_tls: true,
529            trust_level: TrustLevel::Basic,
530        }
531    }
532}
533
534/// Information about a connected peer
535#[derive(Debug, Clone)]
536pub struct PeerInfo {
537    /// Peer identifier
538    pub peer_id: PeerId,
539
540    /// Peer's addresses
541    pub addresses: Vec<String>,
542
543    /// Connection timestamp
544    pub connected_at: Instant,
545
546    /// Last seen timestamp
547    pub last_seen: Instant,
548
549    /// Connection status
550    pub status: ConnectionStatus,
551
552    /// Supported protocols
553    pub protocols: Vec<String>,
554
555    /// Number of heartbeats received
556    pub heartbeat_count: u64,
557}
558
559/// Connection status for a peer
560#[derive(Debug, Clone, PartialEq)]
561pub enum ConnectionStatus {
562    /// Connection is being established
563    Connecting,
564    /// Connection is established and active
565    Connected,
566    /// Connection is being closed
567    Disconnecting,
568    /// Connection is closed
569    Disconnected,
570    /// Connection failed
571    Failed(String),
572}
573
574/// Network events that can occur
575#[derive(Debug, Clone)]
576pub enum NetworkEvent {
577    /// A new peer has connected
578    PeerConnected {
579        /// The identifier of the newly connected peer
580        peer_id: PeerId,
581        /// The network addresses where the peer can be reached
582        addresses: Vec<String>,
583    },
584
585    /// A peer has disconnected
586    PeerDisconnected {
587        /// The identifier of the disconnected peer
588        peer_id: PeerId,
589        /// The reason for the disconnection
590        reason: String,
591    },
592
593    /// A message was received from a peer
594    MessageReceived {
595        /// The identifier of the sending peer
596        peer_id: PeerId,
597        /// The protocol used for the message
598        protocol: String,
599        /// The raw message data
600        data: Vec<u8>,
601    },
602
603    /// A connection attempt failed
604    ConnectionFailed {
605        /// The identifier of the peer (if known)
606        peer_id: Option<PeerId>,
607        /// The address where connection was attempted
608        address: String,
609        /// The error message describing the failure
610        error: String,
611    },
612
613    /// DHT record was stored
614    DHTRecordStored {
615        /// The DHT key where the record was stored
616        key: Vec<u8>,
617        /// The value that was stored
618        value: Vec<u8>,
619    },
620
621    /// DHT record was retrieved
622    DHTRecordRetrieved {
623        /// The DHT key that was queried
624        key: Vec<u8>,
625        /// The retrieved value, if found
626        value: Option<Vec<u8>>,
627    },
628}
629
630/// Network events that can occur in the P2P system
631///
632/// Events are broadcast to all listeners and provide real-time
633/// notifications of network state changes and message arrivals.
634#[derive(Debug, Clone)]
635pub enum P2PEvent {
636    /// Message received from a peer on a specific topic
637    Message {
638        /// Topic or channel the message was sent on
639        topic: String,
640        /// Peer ID of the message sender
641        source: PeerId,
642        /// Raw message data payload
643        data: Vec<u8>,
644    },
645    /// A new peer has connected to the network
646    PeerConnected(PeerId),
647    /// A peer has disconnected from the network
648    PeerDisconnected(PeerId),
649}
650
651/// Response from a peer to a request sent via [`P2PNode::send_request`].
652///
653/// Contains the response payload along with metadata about the responder
654/// and round-trip latency.
655#[derive(Debug, Clone)]
656pub struct PeerResponse {
657    /// The peer that sent the response.
658    pub peer_id: PeerId,
659    /// Raw response payload bytes.
660    pub data: Vec<u8>,
661    /// Round-trip latency from request to response.
662    pub latency: Duration,
663}
664
665/// Wire format for request/response correlation.
666///
667/// Wraps application payloads with a message ID and direction flag
668/// so the receive loop can route responses back to waiting callers.
669#[derive(Debug, Clone, Serialize, Deserialize)]
670pub(crate) struct RequestResponseEnvelope {
671    /// Unique identifier to correlate request ↔ response.
672    pub(crate) message_id: String,
673    /// `false` for requests, `true` for responses.
674    pub(crate) is_response: bool,
675    /// Application payload.
676    pub(crate) payload: Vec<u8>,
677}
678
679/// An in-flight request awaiting a response from a specific peer.
680pub(crate) struct PendingRequest {
681    /// Oneshot sender for delivering the response payload.
682    pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
683    /// The peer we expect the response from (for origin validation).
684    pub(crate) expected_peer: String,
685}
686
687/// Main P2P network node that manages connections, routing, and communication
688///
689/// This struct represents a complete P2P network participant that can:
690/// - Connect to other peers via QUIC transport
691/// - Participate in distributed hash table (DHT) operations
692/// - Send and receive messages through various protocols
693/// - Handle network events and peer lifecycle
694///
695/// Transport concerns (connections, messaging, events) are delegated to
696/// [`TransportHandle`](crate::transport_handle::TransportHandle).
697pub struct P2PNode {
698    /// Node configuration
699    config: NodeConfig,
700
701    /// Our peer ID
702    peer_id: PeerId,
703
704    /// Transport handle owning all QUIC / peer / event state
705    transport: Arc<crate::transport_handle::TransportHandle>,
706
707    /// Node start time
708    start_time: Instant,
709
710    /// Shutdown token — cancelled when the node should stop
711    shutdown: CancellationToken,
712
713    /// DHT manager for distributed hash table operations (peer discovery, routing, storage)
714    dht_manager: Arc<DhtNetworkManager>,
715
716    /// Production resource manager (optional)
717    resource_manager: Option<Arc<ResourceManager>>,
718
719    /// Bootstrap cache manager for peer discovery
720    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
721
722    /// Security dashboard for monitoring
723    pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
724
725    /// Bootstrap state tracking - indicates whether peer discovery has completed
726    is_bootstrapped: Arc<AtomicBool>,
727
728    /// Whether `start()` has been called (and `stop()` has not yet completed)
729    is_started: Arc<AtomicBool>,
730
731    /// EigenTrust engine for reputation management
732    ///
733    /// Used to track peer reliability based on data availability outcomes.
734    /// Consumers (like saorsa-node) should report successes and failures
735    /// via `report_peer_success()` and `report_peer_failure()` methods.
736    trust_engine: Option<Arc<EigenTrustEngine>>,
737}
738
739/// Normalize wildcard bind addresses to localhost loopback addresses
740///
741/// ant-quic correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
742/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
743///
744/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
745/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
746/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
747/// - All other addresses pass through unchanged
748///
749/// # Arguments
750/// * `addr` - The SocketAddr to normalize
751///
752/// # Returns
753/// * Normalized SocketAddr suitable for remote connections
754pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
755    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
756
757    if addr.ip().is_unspecified() {
758        // Convert unspecified addresses to loopback
759        let loopback_ip = match addr {
760            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
761            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
762        };
763        std::net::SocketAddr::new(loopback_ip, addr.port())
764    } else {
765        // Not a wildcard address, pass through unchanged
766        addr
767    }
768}
769
770impl P2PNode {
771    /// Create a new P2P node with the given configuration
772    pub async fn new(config: NodeConfig) -> Result<Self> {
773        let peer_id = config.peer_id.clone().unwrap_or_else(|| {
774            // Generate a random peer ID for now
775            // Safe: UUID v4 canonical format is always 36 characters
776            let uuid_str = uuid::Uuid::new_v4().to_string();
777            format!("peer_{}", &uuid_str[..8])
778        });
779
780        // Initialize and register a TrustWeightedKademlia DHT for the global API
781        // Use a deterministic local NodeId derived from the peer_id
782        {
783            let nid = crate::dht::derive_dht_key_from_peer_id(&peer_id);
784            let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
785                crate::identity::node_identity::NodeId::from_bytes(nid),
786            ));
787            // TODO: Update to use new clean API
788            // let _ = crate::api::set_dht_instance(twdht);
789        }
790
791        // Initialize production resource manager if configured
792        let resource_manager = config
793            .production_config
794            .clone()
795            .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
796
797        // Initialize bootstrap cache manager
798        let diversity_config = config.diversity_config.clone().unwrap_or_default();
799        let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
800            match BootstrapManager::with_full_config(
801                cache_config.clone(),
802                crate::rate_limit::JoinRateLimiterConfig::default(),
803                diversity_config.clone(),
804            )
805            .await
806            {
807                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
808                Err(e) => {
809                    warn!(
810                        "Failed to initialize bootstrap manager: {}, continuing without cache",
811                        e
812                    );
813                    None
814                }
815            }
816        } else {
817            match BootstrapManager::with_full_config(
818                crate::bootstrap::CacheConfig::default(),
819                crate::rate_limit::JoinRateLimiterConfig::default(),
820                diversity_config,
821            )
822            .await
823            {
824                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
825                Err(e) => {
826                    warn!(
827                        "Failed to initialize bootstrap manager: {}, continuing without cache",
828                        e
829                    );
830                    None
831                }
832            }
833        };
834
835        // Initialize EigenTrust engine for reputation management
836        let trust_engine = {
837            let mut pre_trusted = HashSet::new();
838            for bootstrap_peer in &config.bootstrap_peers_str {
839                let node_id_bytes = crate::dht::derive_dht_key_from_peer_id(bootstrap_peer);
840                pre_trusted.insert(NodeId::from_bytes(node_id_bytes));
841            }
842
843            let engine = Arc::new(EigenTrustEngine::new(pre_trusted));
844            engine.clone().start_background_updates();
845            Some(engine)
846        };
847
848        // Build transport handle with all transport-level concerns
849        let transport_config = crate::transport_handle::TransportConfig {
850            peer_id: peer_id.clone(),
851            listen_addr: config.listen_addr,
852            enable_ipv6: config.enable_ipv6,
853            connection_timeout: config.connection_timeout,
854            stale_peer_threshold: config.stale_peer_threshold,
855            max_connections: config.max_connections,
856            production_config: config.production_config.clone(),
857            event_channel_capacity: crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
858            max_message_size: config.max_message_size,
859        };
860        let transport =
861            Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
862
863        // Initialize DHT manager (owns local DHT core and network DHT behavior)
864        let manager_dht_config = crate::dht::DHTConfig {
865            replication_factor: config.dht_config.k_value,
866            bucket_size: config.dht_config.k_value,
867            alpha: config.dht_config.alpha_value,
868            record_ttl: config.dht_config.record_ttl,
869            bucket_refresh_interval: config.dht_config.refresh_interval,
870            republish_interval: config.dht_config.refresh_interval,
871            max_distance: DHT_MAX_DISTANCE,
872        };
873        let dht_manager_config = DhtNetworkConfig {
874            local_peer_id: peer_id.clone(),
875            dht_config: manager_dht_config,
876            node_config: config.clone(),
877            request_timeout: config.connection_timeout,
878            max_concurrent_operations: MAX_ACTIVE_REQUESTS,
879            replication_factor: config.dht_config.k_value,
880            enable_security: true,
881        };
882        let dht_manager = Arc::new(
883            DhtNetworkManager::new(transport.clone(), trust_engine.clone(), dht_manager_config)
884                .await?,
885        );
886
887        let security_metrics = dht_manager.security_metrics().await;
888        let security_dashboard = Some(Arc::new(crate::dht::metrics::SecurityDashboard::new(
889            security_metrics,
890            Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
891            Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
892            Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
893        )));
894
895        let node = Self {
896            config,
897            peer_id,
898            transport,
899            start_time: Instant::now(),
900            shutdown: CancellationToken::new(),
901            dht_manager,
902            resource_manager,
903            bootstrap_manager,
904            security_dashboard,
905            is_bootstrapped: Arc::new(AtomicBool::new(false)),
906            is_started: Arc::new(AtomicBool::new(false)),
907            trust_engine,
908        };
909        info!(
910            "Created P2P node with peer ID: {} (call start() to begin networking)",
911            node.peer_id
912        );
913
914        Ok(node)
915    }
916
917    /// Create a new node builder
918    pub fn builder() -> NodeBuilder {
919        NodeBuilder::new()
920    }
921
922    /// Get the peer ID of this node
923    pub fn peer_id(&self) -> &PeerId {
924        &self.peer_id
925    }
926
927    /// Get the transport handle for sharing with other components.
928    pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
929        &self.transport
930    }
931
932    /// Get the hex-encoded transport-level peer ID.
933    pub fn transport_peer_id(&self) -> Option<String> {
934        self.transport.transport_peer_id()
935    }
936
937    pub fn local_addr(&self) -> Option<String> {
938        self.transport.local_addr()
939    }
940
941    /// Check if the node has completed the initial bootstrap process
942    ///
943    /// Returns `true` if the node has successfully connected to at least one
944    /// bootstrap peer and performed peer discovery (FIND_NODE).
945    pub fn is_bootstrapped(&self) -> bool {
946        self.is_bootstrapped.load(Ordering::SeqCst)
947    }
948
949    /// Manually trigger re-bootstrap (useful for recovery or network rejoin)
950    ///
951    /// This clears the bootstrapped state and attempts to reconnect to
952    /// bootstrap peers and discover new peers.
953    pub async fn re_bootstrap(&self) -> Result<()> {
954        self.is_bootstrapped.store(false, Ordering::SeqCst);
955        self.connect_bootstrap_peers().await
956    }
957
958    // =========================================================================
959    // Trust API - EigenTrust Reputation System
960    // =========================================================================
961
962    /// Get the EigenTrust engine for direct trust operations
963    ///
964    /// This provides access to the underlying trust engine for advanced use cases.
965    /// For simple success/failure reporting, prefer `report_peer_success()` and
966    /// `report_peer_failure()`.
967    ///
968    ///
969    /// # Example
970    ///
971    /// ```rust,ignore
972    /// if let Some(engine) = node.trust_engine() {
973    ///     // Update node statistics directly
974    ///     engine.update_node_stats(&peer_id, NodeStatisticsUpdate::StorageContributed(1024)).await;
975    ///
976    ///     // Get global trust scores
977    ///     let scores = engine.compute_global_trust().await;
978    /// }
979    /// ```
980    pub fn trust_engine(&self) -> Option<Arc<EigenTrustEngine>> {
981        self.trust_engine.clone()
982    }
983
984    /// Canonical conversion from PeerId string to adaptive NodeId for trust.
985    ///
986    /// Delegates to the standalone [`peer_id_to_trust_node_id`] function.
987    fn peer_id_to_trust_node_id(peer_id: &str) -> AdaptiveNodeId {
988        crate::network::peer_id_to_trust_node_id(peer_id)
989    }
990
991    /// Report a successful interaction with a peer
992    ///
993    /// Call this after successful data operations to increase the peer's trust score.
994    /// This is the primary method for saorsa-node to report positive outcomes.
995    ///
996    ///
997    /// # Arguments
998    ///
999    /// * `peer_id` - The peer ID (as a string) of the node that performed well
1000    ///
1001    /// # Example
1002    ///
1003    /// ```rust,ignore
1004    /// // After successfully retrieving a chunk from a peer
1005    /// if let Ok(chunk) = fetch_chunk_from(&peer_id).await {
1006    ///     node.report_peer_success(&peer_id).await?;
1007    /// }
1008    /// ```
1009    pub async fn report_peer_success(&self, peer_id: &str) -> Result<()> {
1010        if let Some(ref engine) = self.trust_engine {
1011            let node_id = Self::peer_id_to_trust_node_id(peer_id);
1012
1013            engine
1014                .update_node_stats(&node_id, NodeStatisticsUpdate::CorrectResponse)
1015                .await;
1016            Ok(())
1017        } else {
1018            // Trust engine not initialized - this is not an error, just a no-op
1019            Ok(())
1020        }
1021    }
1022
1023    /// Report a failed interaction with a peer
1024    ///
1025    /// Call this after failed data operations to decrease the peer's trust score.
1026    /// This includes timeouts, corrupted data, or refused connections.
1027    ///
1028    ///
1029    /// # Arguments
1030    ///
1031    /// * `peer_id` - The peer ID (as a string) of the node that failed
1032    ///
1033    /// # Example
1034    ///
1035    /// ```rust,ignore
1036    /// // After a chunk retrieval fails
1037    /// match fetch_chunk_from(&peer_id).await {
1038    ///     Ok(chunk) => node.report_peer_success(&peer_id).await?,
1039    ///     Err(_) => node.report_peer_failure(&peer_id).await?,
1040    /// }
1041    /// ```
1042    pub async fn report_peer_failure(&self, peer_id: &str) -> Result<()> {
1043        // Delegate to the enriched version with a generic transport-level reason
1044        self.report_peer_failure_with_reason(peer_id, PeerFailureReason::ConnectionFailed)
1045            .await
1046    }
1047
1048    /// Report a failed interaction with a peer, providing a specific failure reason.
1049    ///
1050    /// This is the enriched version of [`P2PNode::report_peer_failure`] that maps the failure
1051    /// reason to the appropriate trust penalty. Use this when you know *why* the
1052    /// interaction failed to give the trust engine more accurate data.
1053    ///
1054    /// - Transport-level failures (`Timeout`, `ConnectionFailed`) map to `FailedResponse`
1055    /// - `DataUnavailable` maps to `DataUnavailable`
1056    /// - `CorruptedData` maps to `CorruptedData` (counts as 2 failures)
1057    /// - `ProtocolError` maps to `ProtocolViolation` (counts as 2 failures)
1058    /// - `Refused` maps to `FailedResponse`
1059    ///
1060    /// Requires the `adaptive-ml` feature to be enabled.
1061    ///
1062    /// # Arguments
1063    ///
1064    /// * `peer_id` - The peer ID of the node that failed
1065    /// * `reason` - Why the interaction failed
1066    ///
1067    /// # Example
1068    ///
1069    /// ```rust,ignore
1070    /// use saorsa_core::error::PeerFailureReason;
1071    ///
1072    /// // After a chunk retrieval returns corrupted data
1073    /// node.report_peer_failure_with_reason(&peer_id, PeerFailureReason::CorruptedData).await?;
1074    /// ```
1075    pub async fn report_peer_failure_with_reason(
1076        &self,
1077        peer_id: &str,
1078        reason: PeerFailureReason,
1079    ) -> Result<()> {
1080        if let Some(ref engine) = self.trust_engine {
1081            let node_id = Self::peer_id_to_trust_node_id(peer_id);
1082
1083            let update = match reason {
1084                PeerFailureReason::Timeout | PeerFailureReason::ConnectionFailed => {
1085                    NodeStatisticsUpdate::FailedResponse
1086                }
1087                PeerFailureReason::DataUnavailable => NodeStatisticsUpdate::DataUnavailable,
1088                PeerFailureReason::CorruptedData => NodeStatisticsUpdate::CorruptedData,
1089                PeerFailureReason::ProtocolError => NodeStatisticsUpdate::ProtocolViolation,
1090                PeerFailureReason::Refused => NodeStatisticsUpdate::FailedResponse,
1091            };
1092
1093            engine.update_node_stats(&node_id, update).await;
1094            Ok(())
1095        } else {
1096            // Trust engine not initialized - this is not an error, just a no-op
1097            Ok(())
1098        }
1099    }
1100
1101    /// Get the current trust score for a peer
1102    ///
1103    /// Returns a value between 0.0 (untrusted) and 1.0 (fully trusted).
1104    /// Unknown peers return 0.0 by default.
1105    ///
1106    ///
1107    /// # Arguments
1108    ///
1109    /// * `peer_id` - The peer ID (as a string) to query
1110    ///
1111    /// # Example
1112    ///
1113    /// ```rust,ignore
1114    /// let trust = node.peer_trust(&peer_id);
1115    /// if trust < 0.3 {
1116    ///     tracing::warn!("Low trust peer: {}", peer_id);
1117    /// }
1118    /// ```
1119    pub fn peer_trust(&self, peer_id: &str) -> f64 {
1120        if let Some(ref engine) = self.trust_engine {
1121            let node_id = Self::peer_id_to_trust_node_id(peer_id);
1122
1123            engine.get_trust(&node_id)
1124        } else {
1125            // Trust engine not initialized - return neutral trust
1126            DEFAULT_NEUTRAL_TRUST
1127        }
1128    }
1129
1130    // =========================================================================
1131    // Request/Response API — Automatic Trust Feedback
1132    // =========================================================================
1133
1134    /// Send a request to a peer and wait for a response with automatic trust reporting.
1135    ///
1136    /// Unlike fire-and-forget `send_message()`, this method:
1137    /// 1. Wraps the payload in a `RequestResponseEnvelope` with a unique message ID
1138    /// 2. Sends it on the `/rr/<protocol>` protocol prefix
1139    /// 3. Waits for a matching response (or timeout)
1140    /// 4. Automatically reports success or failure to the trust engine
1141    ///
1142    /// The remote peer's handler should call `send_response()` with the
1143    /// incoming message ID to route the response back.
1144    ///
1145    /// # Arguments
1146    ///
1147    /// * `peer_id` - Target peer
1148    /// * `protocol` - Application protocol name (e.g. `"chunk_fetch"`)
1149    /// * `data` - Request payload bytes
1150    /// * `timeout` - Maximum time to wait for a response
1151    ///
1152    /// # Returns
1153    ///
1154    /// A [`PeerResponse`] on success, or an error on timeout / connection failure.
1155    ///
1156    /// # Example
1157    ///
1158    /// ```rust,ignore
1159    /// let response = node.send_request(&peer_id, "chunk_fetch", chunk_id.to_vec(), Duration::from_secs(10)).await?;
1160    /// println!("Got {} bytes from {}", response.data.len(), response.peer_id);
1161    /// ```
1162    pub async fn send_request(
1163        &self,
1164        peer_id: &PeerId,
1165        protocol: &str,
1166        data: Vec<u8>,
1167        timeout: Duration,
1168    ) -> Result<PeerResponse> {
1169        match self
1170            .transport
1171            .send_request(peer_id, protocol, data, timeout)
1172            .await
1173        {
1174            Ok(resp) => {
1175                let _ = self.report_peer_success(peer_id).await;
1176                Ok(resp)
1177            }
1178            Err(e) => {
1179                // Choose the right failure reason based on the error type
1180                let reason = if matches!(&e, P2PError::Timeout(_)) {
1181                    PeerFailureReason::Timeout
1182                } else {
1183                    PeerFailureReason::ConnectionFailed
1184                };
1185                let _ = self.report_peer_failure_with_reason(peer_id, reason).await;
1186                Err(e)
1187            }
1188        }
1189    }
1190
1191    pub async fn send_response(
1192        &self,
1193        peer_id: &PeerId,
1194        protocol: &str,
1195        message_id: &str,
1196        data: Vec<u8>,
1197    ) -> Result<()> {
1198        self.transport
1199            .send_response(peer_id, protocol, message_id, data)
1200            .await
1201    }
1202
1203    pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
1204        crate::transport_handle::TransportHandle::parse_request_envelope(data)
1205    }
1206
1207    pub async fn subscribe(&self, topic: &str) -> Result<()> {
1208        self.transport.subscribe(topic).await
1209    }
1210
1211    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1212        self.transport.publish(topic, data).await
1213    }
1214
1215    /// Get the node configuration
1216    pub fn config(&self) -> &NodeConfig {
1217        &self.config
1218    }
1219
1220    /// Start the P2P node
1221    pub async fn start(&self) -> Result<()> {
1222        info!("Starting P2P node...");
1223
1224        // Start production resource manager if configured
1225        if let Some(ref resource_manager) = self.resource_manager {
1226            resource_manager
1227                .start()
1228                .await
1229                .map_err(|e| protocol_error(format!("Failed to start resource manager: {e}")))?;
1230            info!("Production resource manager started");
1231        }
1232
1233        // Start bootstrap manager background tasks
1234        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1235            let mut manager = bootstrap_manager.write().await;
1236            manager
1237                .start_background_tasks()
1238                .await
1239                .map_err(|e| protocol_error(format!("Failed to start bootstrap manager: {e}")))?;
1240            info!("Bootstrap cache manager started");
1241        }
1242
1243        // Start transport listeners and message receiving
1244        self.transport.start_network_listeners().await?;
1245
1246        // Start the attached DHT manager.
1247        Arc::clone(&self.dht_manager).start().await?;
1248
1249        // Log current listen addresses
1250        let listen_addrs = self.transport.listen_addrs().await;
1251        info!("P2P node started on addresses: {:?}", listen_addrs);
1252
1253        // NOTE: Message receiving is now integrated into the accept loop in start_network_listeners()
1254        // The old start_message_receiving_system() is no longer needed as it competed with the accept
1255        // loop for incoming connections, causing messages to be lost.
1256
1257        // Connect to bootstrap peers
1258        self.connect_bootstrap_peers().await?;
1259
1260        self.is_started
1261            .store(true, std::sync::atomic::Ordering::Release);
1262
1263        Ok(())
1264    }
1265
1266    // start_network_listeners and start_message_receiving_system
1267    // are now implemented in TransportHandle
1268
1269    /// Run the P2P node (blocks until shutdown)
1270    pub async fn run(&self) -> Result<()> {
1271        if !self.is_running() {
1272            self.start().await?;
1273        }
1274
1275        info!("P2P node running...");
1276
1277        let mut interval = tokio::time::interval(Duration::from_millis(RUN_LOOP_TICK_INTERVAL_MS));
1278        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1279
1280        // Main event loop
1281        loop {
1282            tokio::select! {
1283                _ = interval.tick() => {
1284                    self.transport.maintenance_tick().await?;
1285                }
1286                () = self.shutdown.cancelled() => {
1287                    break;
1288                }
1289            }
1290        }
1291
1292        info!("P2P node stopped");
1293        Ok(())
1294    }
1295
1296    /// Stop the P2P node
1297    pub async fn stop(&self) -> Result<()> {
1298        info!("Stopping P2P node...");
1299
1300        // Signal the run loop to exit
1301        self.shutdown.cancel();
1302
1303        // Stop DHT manager first so leave messages can be sent while transport is still active.
1304        self.dht_manager.stop().await?;
1305
1306        // Stop the transport layer (shutdown endpoints, join tasks, disconnect peers)
1307        self.transport.stop().await?;
1308
1309        // Shutdown production resource manager if configured
1310        if let Some(ref resource_manager) = self.resource_manager {
1311            resource_manager
1312                .shutdown()
1313                .await
1314                .map_err(|e| protocol_error(format!("Failed to shutdown resource manager: {e}")))?;
1315            info!("Production resource manager stopped");
1316        }
1317
1318        self.is_started
1319            .store(false, std::sync::atomic::Ordering::Release);
1320
1321        info!("P2P node stopped");
1322        Ok(())
1323    }
1324
1325    /// Graceful shutdown alias for tests
1326    pub async fn shutdown(&self) -> Result<()> {
1327        self.stop().await
1328    }
1329
1330    /// Check if the node is running
1331    pub fn is_running(&self) -> bool {
1332        self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
1333    }
1334
1335    /// Get the current listen addresses
1336    pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1337        self.transport.listen_addrs().await
1338    }
1339
1340    /// Get connected peers
1341    pub async fn connected_peers(&self) -> Vec<PeerId> {
1342        self.transport.connected_peers().await
1343    }
1344
1345    /// Get peer count
1346    pub async fn peer_count(&self) -> usize {
1347        self.transport.peer_count().await
1348    }
1349
1350    /// Get peer info
1351    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1352        self.transport.peer_info(peer_id).await
1353    }
1354
1355    /// Get the peer ID for a given socket address, if connected
1356    pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1357        self.transport.get_peer_id_by_address(addr).await
1358    }
1359
1360    /// List all active connections with their peer IDs and addresses
1361    pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1362        self.transport.list_active_connections().await
1363    }
1364
1365    /// Remove a peer from the peers map
1366    pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1367        self.transport.remove_peer(peer_id).await
1368    }
1369
1370    /// Check if a peer is connected
1371    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1372        self.transport.is_peer_connected(peer_id).await
1373    }
1374
1375    /// Connect to a peer
1376    pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1377        self.transport.connect_peer(address).await
1378    }
1379
1380    /// Disconnect from a peer
1381    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1382        self.transport.disconnect_peer(peer_id).await
1383    }
1384
1385    /// Check if a connection to a peer is active
1386    pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1387        self.transport.is_connection_active(peer_id).await
1388    }
1389
1390    /// Send a message to a peer
1391    pub async fn send_message(
1392        &self,
1393        peer_id: &PeerId,
1394        protocol: &str,
1395        data: Vec<u8>,
1396    ) -> Result<()> {
1397        self.transport.send_message(peer_id, protocol, data).await
1398    }
1399}
1400
1401/// Parse a postcard-encoded protocol message into a `P2PEvent::Message`.
1402///
1403/// Returns `None` if the bytes cannot be deserialized as a valid `WireMessage`.
1404///
1405/// The `from` field is a required part of the wire protocol but is **not**
1406/// used as the event source. Instead, `source` — the transport-level peer ID
1407/// derived from the authenticated QUIC connection — is used so that consumers
1408/// can pass it directly to `send_message()`. This eliminates a spoofing
1409/// vector where a peer could claim an arbitrary identity via the payload.
1410///
1411/// Maximum allowed clock skew for message timestamps (5 minutes).
1412/// This is intentionally lenient for initial deployment to accommodate nodes with
1413/// misconfigured clocks or high-latency network conditions. Can be tightened (e.g., to 60s)
1414/// once the network stabilizes and node clock synchronization improves.
1415const MAX_MESSAGE_AGE_SECS: u64 = 300;
1416/// Maximum allowed future timestamp (30 seconds to account for clock drift)
1417const MAX_FUTURE_SECS: u64 = 30;
1418
1419/// Canonical conversion from PeerId string to adaptive NodeId for trust.
1420///
1421/// PeerId strings are hex-encoded 32-byte identifiers. This decodes them
1422/// back to raw bytes, matching the DHT NodeId representation used by
1423/// `trust_peer_selector`. Falls back to blake3 hash for non-hex IDs.
1424pub fn peer_id_to_trust_node_id(peer_id: &str) -> AdaptiveNodeId {
1425    if let Ok(bytes) = hex::decode(peer_id)
1426        && bytes.len() == 32
1427    {
1428        let mut arr = [0u8; 32];
1429        arr.copy_from_slice(&bytes);
1430        return AdaptiveNodeId::from_bytes(arr);
1431    }
1432    // Non-hex or wrong length: use canonical derivation
1433    let hash = crate::dht::derive_dht_key_from_peer_id(peer_id);
1434    AdaptiveNodeId::from_bytes(hash)
1435}
1436
1437/// Convenience constructor for `P2PError::Network(NetworkError::ProtocolError(...))`.
1438fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
1439    P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
1440}
1441
1442/// Helper to send an event via a broadcast sender, logging at trace level if no receivers.
1443pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
1444    if let Err(e) = tx.send(event) {
1445        tracing::trace!("Event broadcast has no receivers: {e}");
1446    }
1447}
1448
1449pub(crate) fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<P2PEvent> {
1450    let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1451
1452    // Validate timestamp to prevent replay attacks
1453    let now = std::time::SystemTime::now()
1454        .duration_since(std::time::UNIX_EPOCH)
1455        .map(|d| d.as_secs())
1456        .unwrap_or(0);
1457
1458    // Reject messages that are too old (potential replay)
1459    if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
1460        tracing::warn!(
1461            "Rejecting stale message from {} (timestamp {} is {} seconds old)",
1462            source,
1463            message.timestamp,
1464            now.saturating_sub(message.timestamp)
1465        );
1466        return None;
1467    }
1468
1469    // Reject messages too far in the future (clock manipulation)
1470    if message.timestamp > now + MAX_FUTURE_SECS {
1471        tracing::warn!(
1472            "Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
1473            source,
1474            message.timestamp,
1475            message.timestamp.saturating_sub(now)
1476        );
1477        return None;
1478    }
1479
1480    debug!(
1481        "Parsed P2PEvent::Message - topic: {}, source: {} (logical: {}), payload_len: {}",
1482        message.protocol,
1483        source,
1484        message.from,
1485        message.data.len()
1486    );
1487
1488    Some(P2PEvent::Message {
1489        topic: message.protocol,
1490        source: source.to_string(),
1491        data: message.data,
1492    })
1493}
1494
1495impl P2PNode {
1496    /// Subscribe to network events
1497    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1498        self.transport.subscribe_events()
1499    }
1500
1501    /// Backwards-compat event stream accessor for tests
1502    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1503        self.subscribe_events()
1504    }
1505
1506    /// Get node uptime
1507    pub fn uptime(&self) -> Duration {
1508        self.start_time.elapsed()
1509    }
1510
1511    // MCP removed: all MCP tool/service methods removed
1512
1513    // /// Handle MCP remote tool call with network integration
1514
1515    // /// List tools available on a specific remote peer
1516
1517    // /// Get MCP server statistics
1518
1519    /// Get production resource metrics
1520    pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1521        if let Some(ref resource_manager) = self.resource_manager {
1522            Ok(resource_manager.get_metrics().await)
1523        } else {
1524            Err(protocol_error("Production resource manager not enabled"))
1525        }
1526    }
1527
1528    // Background tasks (connection_lifecycle_monitor, keepalive, periodic_maintenance)
1529    // are now implemented in TransportHandle.
1530
1531    /// Check system health
1532    pub async fn health_check(&self) -> Result<()> {
1533        if let Some(ref resource_manager) = self.resource_manager {
1534            resource_manager.health_check().await
1535        } else {
1536            // Basic health check without resource manager
1537            let peer_count = self.peer_count().await;
1538            if peer_count > self.config.max_connections {
1539                Err(protocol_error(format!(
1540                    "Too many connections: {peer_count}"
1541                )))
1542            } else {
1543                Ok(())
1544            }
1545        }
1546    }
1547
1548    /// Get production configuration (if enabled)
1549    pub fn production_config(&self) -> Option<&ProductionConfig> {
1550        self.config.production_config.as_ref()
1551    }
1552
1553    /// Check if production hardening is enabled
1554    pub fn is_production_mode(&self) -> bool {
1555        self.resource_manager.is_some()
1556    }
1557
1558    /// Get the attached DHT manager.
1559    pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
1560        &self.dht_manager
1561    }
1562
1563    /// Backwards-compatible alias for `dht_manager()`.
1564    pub fn dht(&self) -> &Arc<DhtNetworkManager> {
1565        self.dht_manager()
1566    }
1567
1568    /// Store a value in the local DHT
1569    ///
1570    /// This method stores data in the local DHT core through the attached manager.
1571    /// For network-wide replication across multiple nodes, use `DhtNetworkManager::put`.
1572    pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1573        self.dht_manager.store_local(key, value).await
1574    }
1575
1576    /// Retrieve a value from the local DHT
1577    ///
1578    /// This method retrieves data from the local DHT core through the attached manager.
1579    /// For network-wide lookups across multiple nodes, use `DhtNetworkManager::get`.
1580    pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1581        self.dht_manager.get_local(&key).await
1582    }
1583
1584    /// Add a discovered peer to the bootstrap cache
1585    pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1586        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1587            let manager = bootstrap_manager.write().await;
1588            let socket_addresses: Vec<std::net::SocketAddr> = addresses
1589                .iter()
1590                .filter_map(|addr| addr.parse().ok())
1591                .collect();
1592            let contact = ContactEntry::new(peer_id, socket_addresses);
1593            manager.add_contact(contact).await.map_err(|e| {
1594                protocol_error(format!("Failed to add peer to bootstrap cache: {e}"))
1595            })?;
1596        }
1597        Ok(())
1598    }
1599
1600    /// Update connection metrics for a peer in the bootstrap cache
1601    pub async fn update_peer_metrics(
1602        &self,
1603        peer_id: &PeerId,
1604        success: bool,
1605        latency_ms: Option<u64>,
1606        _error: Option<String>,
1607    ) -> Result<()> {
1608        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1609            let manager = bootstrap_manager.write().await;
1610
1611            // Create quality metrics based on the connection result
1612            let metrics = QualityMetrics {
1613                success_rate: if success { 1.0 } else { 0.0 },
1614                avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1615                quality_score: if success {
1616                    BOOTSTRAP_QUALITY_SCORE_SUCCESS
1617                } else {
1618                    BOOTSTRAP_QUALITY_SCORE_FAILURE
1619                },
1620                last_connection_attempt: chrono::Utc::now(),
1621                last_successful_connection: if success {
1622                    chrono::Utc::now()
1623                } else {
1624                    chrono::Utc::now() - chrono::Duration::hours(BOOTSTRAP_FAILURE_PENALTY_HOURS)
1625                },
1626                uptime_score: BOOTSTRAP_DEFAULT_UPTIME_SCORE,
1627            };
1628
1629            manager
1630                .update_contact_metrics(peer_id, metrics)
1631                .await
1632                .map_err(|e| protocol_error(format!("Failed to update peer metrics: {e}")))?;
1633        }
1634        Ok(())
1635    }
1636
1637    /// Get bootstrap cache statistics
1638    pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1639        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1640            let manager = bootstrap_manager.read().await;
1641            let stats = manager
1642                .get_stats()
1643                .await
1644                .map_err(|e| protocol_error(format!("Failed to get bootstrap stats: {e}")))?;
1645            Ok(Some(stats))
1646        } else {
1647            Ok(None)
1648        }
1649    }
1650
1651    /// Get the number of cached bootstrap peers
1652    pub async fn cached_peer_count(&self) -> usize {
1653        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1654            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1655        {
1656            return stats.total_contacts;
1657        }
1658        0
1659    }
1660
1661    /// Connect to bootstrap peers and perform initial peer discovery
1662    async fn connect_bootstrap_peers(&self) -> Result<()> {
1663        let mut bootstrap_contacts = Vec::new();
1664        let mut used_cache = false;
1665        let mut seen_addresses = std::collections::HashSet::new();
1666
1667        // CLI-provided bootstrap peers take priority - always include them first
1668        let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1669            self.config.bootstrap_peers_str.clone()
1670        } else {
1671            // Convert Multiaddr to strings
1672            self.config
1673                .bootstrap_peers
1674                .iter()
1675                .map(|addr| addr.to_string())
1676                .collect::<Vec<_>>()
1677        };
1678
1679        if !cli_bootstrap_peers.is_empty() {
1680            info!(
1681                "Using {} CLI-provided bootstrap peers (priority)",
1682                cli_bootstrap_peers.len()
1683            );
1684            for addr in &cli_bootstrap_peers {
1685                if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
1686                    seen_addresses.insert(socket_addr);
1687                    let contact = ContactEntry::new(
1688                        format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
1689                        vec![socket_addr],
1690                    );
1691                    bootstrap_contacts.push(contact);
1692                } else {
1693                    warn!("Invalid bootstrap address format: {}", addr);
1694                }
1695            }
1696        }
1697
1698        // Supplement with cached bootstrap peers (after CLI peers)
1699        // Use QUIC-specific peer selection since we're using ant-quic transport
1700        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1701            let manager = bootstrap_manager.read().await;
1702            match manager
1703                .get_quic_bootstrap_peers(BOOTSTRAP_PEER_BATCH_SIZE)
1704                .await
1705            {
1706                // Try to get top 20 quality QUIC-enabled peers
1707                Ok(contacts) => {
1708                    if !contacts.is_empty() {
1709                        let mut added_from_cache = 0;
1710                        for contact in contacts {
1711                            // Only add if we haven't already added this address from CLI
1712                            let new_addresses: Vec<_> = contact
1713                                .addresses
1714                                .iter()
1715                                .filter(|addr| !seen_addresses.contains(addr))
1716                                .copied()
1717                                .collect();
1718
1719                            if !new_addresses.is_empty() {
1720                                for addr in &new_addresses {
1721                                    seen_addresses.insert(*addr);
1722                                }
1723                                let mut contact = contact.clone();
1724                                contact.addresses = new_addresses;
1725                                bootstrap_contacts.push(contact);
1726                                added_from_cache += 1;
1727                            }
1728                        }
1729                        if added_from_cache > 0 {
1730                            info!(
1731                                "Added {} cached bootstrap peers (supplementing CLI peers)",
1732                                added_from_cache
1733                            );
1734                            used_cache = true;
1735                        }
1736                    }
1737                }
1738                Err(e) => {
1739                    warn!("Failed to get cached bootstrap peers: {}", e);
1740                }
1741            }
1742        }
1743
1744        if bootstrap_contacts.is_empty() {
1745            info!("No bootstrap peers configured and no cached peers available");
1746            return Ok(());
1747        }
1748
1749        // Connect to bootstrap peers and perform peer discovery
1750        let mut successful_connections = 0;
1751        let mut connected_peer_ids: Vec<PeerId> = Vec::new();
1752
1753        for contact in bootstrap_contacts.iter() {
1754            for addr in &contact.addresses {
1755                match self.connect_peer(&addr.to_string()).await {
1756                    Ok(peer_id) => {
1757                        successful_connections += 1;
1758                        connected_peer_ids.push(peer_id.clone());
1759
1760                        // Update bootstrap cache with successful connection
1761                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1762                            let manager = bootstrap_manager.write().await;
1763                            let mut updated_contact = contact.clone();
1764                            updated_contact.peer_id = peer_id.clone();
1765                            updated_contact.update_connection_result(true, Some(100), None); // Assume 100ms latency for now
1766
1767                            if let Err(e) = manager.add_contact(updated_contact).await {
1768                                warn!("Failed to update bootstrap cache: {}", e);
1769                            }
1770                        }
1771                        break; // Successfully connected, move to next contact
1772                    }
1773                    Err(e) => {
1774                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1775
1776                        // Update bootstrap cache with failed connection
1777                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
1778                            let manager = bootstrap_manager.write().await;
1779                            let mut updated_contact = contact.clone();
1780                            updated_contact.update_connection_result(
1781                                false,
1782                                None,
1783                                Some(e.to_string()),
1784                            );
1785
1786                            if let Err(e) = manager.add_contact(updated_contact).await {
1787                                warn!("Failed to update bootstrap cache: {}", e);
1788                            }
1789                        }
1790                    }
1791                }
1792            }
1793        }
1794
1795        if successful_connections == 0 {
1796            if !used_cache {
1797                warn!("Failed to connect to any bootstrap peers");
1798            }
1799            // Starting a node should not be gated on immediate bootstrap connectivity.
1800            // Keep running and allow background discovery / retries to populate peers later.
1801            return Ok(());
1802        }
1803
1804        info!(
1805            "Successfully connected to {} bootstrap peers",
1806            successful_connections
1807        );
1808
1809        // Perform DHT peer discovery from connected bootstrap peers
1810        // Uses the DHT manager's postcard protocol for correct deserialization
1811        match self
1812            .dht_manager
1813            .bootstrap_from_peers(&connected_peer_ids)
1814            .await
1815        {
1816            Ok(count) => info!("DHT peer discovery found {} peers", count),
1817            Err(e) => warn!("DHT peer discovery failed: {}", e),
1818        }
1819
1820        // Mark node as bootstrapped - we have connected to bootstrap peers
1821        // and initiated peer discovery
1822        self.is_bootstrapped.store(true, Ordering::SeqCst);
1823        info!(
1824            "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
1825            successful_connections,
1826            connected_peer_ids.len()
1827        );
1828
1829        Ok(())
1830    }
1831
1832    // disconnect_all_peers and periodic_tasks are now in TransportHandle
1833}
1834
1835/// Network sender trait for sending messages
1836#[async_trait::async_trait]
1837pub trait NetworkSender: Send + Sync {
1838    /// Send a message to a specific peer
1839    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
1840
1841    /// Get our local peer ID
1842    fn local_peer_id(&self) -> &PeerId;
1843}
1844
1845// P2PNetworkSender removed — NetworkSender is now implemented directly on TransportHandle.
1846
1847/// Builder pattern for creating P2P nodes
1848pub struct NodeBuilder {
1849    config: NodeConfig,
1850}
1851
1852impl Default for NodeBuilder {
1853    fn default() -> Self {
1854        Self::new()
1855    }
1856}
1857
1858impl NodeBuilder {
1859    /// Create a new node builder
1860    pub fn new() -> Self {
1861        Self {
1862            config: NodeConfig::default(),
1863        }
1864    }
1865
1866    /// Set the peer ID
1867    pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
1868        self.config.peer_id = Some(peer_id);
1869        self
1870    }
1871
1872    /// Add a listen address
1873    pub fn listen_on(mut self, addr: &str) -> Self {
1874        if let Ok(multiaddr) = addr.parse() {
1875            self.config.listen_addrs.push(multiaddr);
1876        }
1877        self
1878    }
1879
1880    /// Add a bootstrap peer
1881    pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
1882        if let Ok(multiaddr) = addr.parse() {
1883            self.config.bootstrap_peers.push(multiaddr);
1884        }
1885        self.config.bootstrap_peers_str.push(addr.to_string());
1886        self
1887    }
1888
1889    /// Enable IPv6 support
1890    pub fn with_ipv6(mut self, enable: bool) -> Self {
1891        self.config.enable_ipv6 = enable;
1892        self
1893    }
1894
1895    // MCP removed: builder methods deleted
1896
1897    /// Set connection timeout
1898    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
1899        self.config.connection_timeout = timeout;
1900        self
1901    }
1902
1903    /// Set maximum connections
1904    pub fn with_max_connections(mut self, max: usize) -> Self {
1905        self.config.max_connections = max;
1906        self
1907    }
1908
1909    /// Enable production mode with default configuration
1910    pub fn with_production_mode(mut self) -> Self {
1911        self.config.production_config = Some(ProductionConfig::default());
1912        self
1913    }
1914
1915    /// Configure production settings
1916    pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
1917        self.config.production_config = Some(production_config);
1918        self
1919    }
1920
1921    /// Configure IP diversity limits for Sybil protection.
1922    pub fn with_diversity_config(
1923        mut self,
1924        diversity_config: crate::security::IPDiversityConfig,
1925    ) -> Self {
1926        self.config.diversity_config = Some(diversity_config);
1927        self
1928    }
1929
1930    /// Configure DHT settings
1931    pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
1932        self.config.dht_config = dht_config;
1933        self
1934    }
1935
1936    /// Enable DHT with default configuration
1937    pub fn with_default_dht(mut self) -> Self {
1938        self.config.dht_config = DHTConfig::default();
1939        self
1940    }
1941
1942    /// Build the P2P node
1943    pub async fn build(self) -> Result<P2PNode> {
1944        P2PNode::new(self.config).await
1945    }
1946}
1947
1948#[cfg(test)]
1949#[allow(clippy::unwrap_used, clippy::expect_used)]
1950mod diversity_tests {
1951    use super::*;
1952    use crate::security::IPDiversityConfig;
1953
1954    async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
1955        let diversity_config = config.diversity_config.clone().unwrap_or_default();
1956        // Use a temp dir to avoid conflicts with cached files from old format
1957        let temp_dir = tempfile::TempDir::new().expect("temp dir");
1958        let mut cache_config = config.bootstrap_cache_config.clone().unwrap_or_default();
1959        cache_config.cache_dir = temp_dir.path().to_path_buf();
1960
1961        BootstrapManager::with_full_config(
1962            cache_config,
1963            crate::rate_limit::JoinRateLimiterConfig::default(),
1964            diversity_config,
1965        )
1966        .await
1967        .expect("bootstrap manager")
1968    }
1969
1970    #[tokio::test]
1971    async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
1972        let config = NodeConfig {
1973            diversity_config: Some(IPDiversityConfig::testnet()),
1974            ..Default::default()
1975        };
1976
1977        let manager = build_bootstrap_manager_like_prod(&config).await;
1978        assert!(manager.diversity_config().is_relaxed());
1979        assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
1980    }
1981}
1982
1983/// Helper function to register a new peer
1984pub(crate) async fn register_new_peer(
1985    peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1986    peer_id: &PeerId,
1987    remote_addr: &NetworkAddress,
1988) {
1989    let mut peers_guard = peers.write().await;
1990    let peer_info = PeerInfo {
1991        peer_id: peer_id.clone(),
1992        addresses: vec![remote_addr.to_string()],
1993        connected_at: tokio::time::Instant::now(),
1994        last_seen: tokio::time::Instant::now(),
1995        status: ConnectionStatus::Connected,
1996        protocols: vec!["p2p-core/1.0.0".to_string()],
1997        heartbeat_count: 0,
1998    };
1999    peers_guard.insert(peer_id.clone(), peer_info);
2000}
2001
2002#[cfg(test)]
2003mod tests {
2004    use super::*;
2005    // MCP removed from tests
2006    use std::time::Duration;
2007    use tokio::time::timeout;
2008
2009    // Test tool handler for network tests
2010
2011    // MCP removed
2012
2013    /// Helper function to create a test node configuration
2014    fn create_test_node_config() -> NodeConfig {
2015        NodeConfig {
2016            peer_id: Some("test_peer_123".to_string()),
2017            listen_addrs: vec![
2018                std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2019                std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2020            ],
2021            listen_addr: std::net::SocketAddr::new(
2022                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2023                0,
2024            ),
2025            bootstrap_peers: vec![],
2026            bootstrap_peers_str: vec![],
2027            enable_ipv6: true,
2028
2029            connection_timeout: Duration::from_secs(2),
2030            keep_alive_interval: Duration::from_secs(30),
2031            max_connections: 100,
2032            max_incoming_connections: 50,
2033            dht_config: DHTConfig::default(),
2034            security_config: SecurityConfig::default(),
2035            production_config: None,
2036            bootstrap_cache_config: None,
2037            diversity_config: None,
2038            stale_peer_threshold: default_stale_peer_threshold(),
2039            max_message_size: None,
2040        }
2041    }
2042
2043    /// Helper function to create a test tool
2044    // MCP removed: test tool helper deleted
2045
2046    #[tokio::test]
2047    async fn test_node_config_default() {
2048        let config = NodeConfig::default();
2049
2050        assert!(config.peer_id.is_none());
2051        assert_eq!(config.listen_addrs.len(), 2);
2052        assert!(config.enable_ipv6);
2053        assert_eq!(config.max_connections, 10000); // Fixed: matches actual default
2054        assert_eq!(config.max_incoming_connections, 100);
2055        assert_eq!(config.connection_timeout, Duration::from_secs(30));
2056    }
2057
2058    #[tokio::test]
2059    async fn test_dht_config_default() {
2060        let config = DHTConfig::default();
2061
2062        assert_eq!(config.k_value, 20);
2063        assert_eq!(config.alpha_value, 5);
2064        assert_eq!(config.record_ttl, Duration::from_secs(3600));
2065        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2066    }
2067
2068    #[tokio::test]
2069    async fn test_security_config_default() {
2070        let config = SecurityConfig::default();
2071
2072        assert!(config.enable_noise);
2073        assert!(config.enable_tls);
2074        assert_eq!(config.trust_level, TrustLevel::Basic);
2075    }
2076
2077    #[test]
2078    fn test_trust_level_variants() {
2079        // Test that all trust level variants can be created
2080        let _none = TrustLevel::None;
2081        let _basic = TrustLevel::Basic;
2082        let _full = TrustLevel::Full;
2083
2084        // Test equality
2085        assert_eq!(TrustLevel::None, TrustLevel::None);
2086        assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2087        assert_eq!(TrustLevel::Full, TrustLevel::Full);
2088        assert_ne!(TrustLevel::None, TrustLevel::Basic);
2089    }
2090
2091    #[test]
2092    fn test_connection_status_variants() {
2093        let connecting = ConnectionStatus::Connecting;
2094        let connected = ConnectionStatus::Connected;
2095        let disconnecting = ConnectionStatus::Disconnecting;
2096        let disconnected = ConnectionStatus::Disconnected;
2097        let failed = ConnectionStatus::Failed("test error".to_string());
2098
2099        assert_eq!(connecting, ConnectionStatus::Connecting);
2100        assert_eq!(connected, ConnectionStatus::Connected);
2101        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2102        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2103        assert_ne!(connecting, connected);
2104
2105        if let ConnectionStatus::Failed(msg) = failed {
2106            assert_eq!(msg, "test error");
2107        } else {
2108            panic!("Expected Failed status");
2109        }
2110    }
2111
2112    #[tokio::test]
2113    async fn test_node_creation() -> Result<()> {
2114        let config = create_test_node_config();
2115        let node = P2PNode::new(config).await?;
2116
2117        assert_eq!(node.peer_id(), "test_peer_123");
2118        assert!(!node.is_running());
2119        assert_eq!(node.peer_count().await, 0);
2120        assert!(node.connected_peers().await.is_empty());
2121
2122        Ok(())
2123    }
2124
2125    #[tokio::test]
2126    async fn test_node_creation_without_peer_id() -> Result<()> {
2127        let mut config = create_test_node_config();
2128        config.peer_id = None;
2129
2130        let node = P2PNode::new(config).await?;
2131
2132        // Should have generated a peer ID
2133        assert!(node.peer_id().starts_with("peer_"));
2134        assert!(!node.is_running());
2135
2136        Ok(())
2137    }
2138
2139    #[tokio::test]
2140    async fn test_node_lifecycle() -> Result<()> {
2141        let config = create_test_node_config();
2142        let node = P2PNode::new(config).await?;
2143
2144        // Initially not running
2145        assert!(!node.is_running());
2146
2147        // Start the node
2148        node.start().await?;
2149        assert!(node.is_running());
2150
2151        // Check listen addresses were set (at least one)
2152        let listen_addrs = node.listen_addrs().await;
2153        assert!(
2154            !listen_addrs.is_empty(),
2155            "Expected at least one listening address"
2156        );
2157
2158        // Stop the node
2159        node.stop().await?;
2160        assert!(!node.is_running());
2161
2162        Ok(())
2163    }
2164
2165    #[tokio::test]
2166    async fn test_peer_connection() -> Result<()> {
2167        let config1 = create_test_node_config();
2168        let mut config2 = create_test_node_config();
2169        config2.peer_id = Some("test_peer_456".to_string());
2170
2171        let node1 = P2PNode::new(config1).await?;
2172        let node2 = P2PNode::new(config2).await?;
2173
2174        node1.start().await?;
2175        node2.start().await?;
2176
2177        let node2_addr = node2
2178            .listen_addrs()
2179            .await
2180            .into_iter()
2181            .find(|a| a.ip().is_ipv4())
2182            .ok_or_else(|| {
2183                P2PError::Network(crate::error::NetworkError::InvalidAddress(
2184                    "Node 2 did not expose an IPv4 listen address".into(),
2185                ))
2186            })?;
2187
2188        // Connect to a real peer
2189        let peer_id = node1.connect_peer(&node2_addr.to_string()).await?;
2190
2191        // Check peer count
2192        assert_eq!(node1.peer_count().await, 1);
2193
2194        // Check connected peers
2195        let connected_peers = node1.connected_peers().await;
2196        assert_eq!(connected_peers.len(), 1);
2197        assert_eq!(connected_peers[0], peer_id);
2198
2199        // Get peer info
2200        let peer_info = node1.peer_info(&peer_id).await;
2201        assert!(peer_info.is_some());
2202        let info = peer_info.expect("Peer info should exist after adding peer");
2203        assert_eq!(info.peer_id, peer_id);
2204        assert_eq!(info.status, ConnectionStatus::Connected);
2205        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2206
2207        // Disconnect from peer
2208        node1.disconnect_peer(&peer_id).await?;
2209        assert_eq!(node1.peer_count().await, 0);
2210
2211        node1.stop().await?;
2212        node2.stop().await?;
2213
2214        Ok(())
2215    }
2216
2217    // TODO(windows): Investigate QUIC connection issues on Windows CI
2218    // This test consistently fails on Windows GitHub Actions runners with
2219    // "All connect attempts failed" even with IPv4-only config, long delays,
2220    // and multiple retry attempts. The underlying ant-quic library may have
2221    // issues on Windows that need investigation.
2222    // See: https://github.com/dirvine/saorsa-core/issues/TBD
2223    #[cfg_attr(target_os = "windows", ignore)]
2224    #[tokio::test]
2225    async fn test_event_subscription() -> Result<()> {
2226        // Configure both nodes to use only IPv4 for reliable cross-platform testing
2227        // This is important because:
2228        // 1. local_addr() returns the first address from listen_addrs
2229        // 2. The default config puts IPv6 first, which may not work on all Windows setups
2230        let ipv4_localhost =
2231            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2232
2233        let mut config1 = create_test_node_config();
2234        config1.listen_addr = ipv4_localhost;
2235        config1.listen_addrs = vec![ipv4_localhost];
2236        config1.enable_ipv6 = false;
2237
2238        let mut config2 = create_test_node_config();
2239        config2.peer_id = Some("test_peer_456".to_string());
2240        config2.listen_addr = ipv4_localhost;
2241        config2.listen_addrs = vec![ipv4_localhost];
2242        config2.enable_ipv6 = false;
2243
2244        let node1 = P2PNode::new(config1).await?;
2245        let node2 = P2PNode::new(config2).await?;
2246
2247        node1.start().await?;
2248        node2.start().await?;
2249
2250        // Wait for nodes to fully bind their listening sockets
2251        // Windows network stack initialization can be significantly slower
2252        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
2253
2254        let mut events = node1.subscribe_events();
2255
2256        // Get the actual listening address using local_addr() for reliability
2257        let node2_addr = node2.local_addr().ok_or_else(|| {
2258            P2PError::Network(crate::error::NetworkError::ProtocolError(
2259                "No listening address".to_string().into(),
2260            ))
2261        })?;
2262
2263        // Connect to a peer with retry logic for Windows reliability
2264        // The QUIC library may need additional time to fully initialize
2265        let mut peer_id = None;
2266        for attempt in 0..3 {
2267            if attempt > 0 {
2268                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2269            }
2270            match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
2271                Ok(Ok(id)) => {
2272                    peer_id = Some(id);
2273                    break;
2274                }
2275                Ok(Err(_)) | Err(_) => continue,
2276            }
2277        }
2278        let peer_id = peer_id.ok_or_else(|| {
2279            P2PError::Network(crate::error::NetworkError::ProtocolError(
2280                "Failed to connect after 3 attempts".to_string().into(),
2281            ))
2282        })?;
2283
2284        // Check for PeerConnected event
2285        let event = timeout(Duration::from_secs(2), events.recv()).await;
2286        assert!(event.is_ok());
2287
2288        let event_result = event
2289            .expect("Should receive event")
2290            .expect("Event should not be error");
2291        match event_result {
2292            P2PEvent::PeerConnected(event_peer_id) => {
2293                assert_eq!(event_peer_id, peer_id);
2294            }
2295            _ => panic!("Expected PeerConnected event"),
2296        }
2297
2298        // Disconnect from peer (this should emit another event)
2299        node1.disconnect_peer(&peer_id).await?;
2300
2301        // Check for PeerDisconnected event
2302        let event = timeout(Duration::from_secs(2), events.recv()).await;
2303        assert!(event.is_ok());
2304
2305        let event_result = event
2306            .expect("Should receive event")
2307            .expect("Event should not be error");
2308        match event_result {
2309            P2PEvent::PeerDisconnected(event_peer_id) => {
2310                assert_eq!(event_peer_id, peer_id);
2311            }
2312            _ => panic!("Expected PeerDisconnected event"),
2313        }
2314
2315        node1.stop().await?;
2316        node2.stop().await?;
2317
2318        Ok(())
2319    }
2320
2321    // TODO(windows): Same QUIC connection issues as test_event_subscription
2322    #[cfg_attr(target_os = "windows", ignore)]
2323    #[tokio::test]
2324    async fn test_message_sending() -> Result<()> {
2325        // Create two nodes
2326        let mut config1 = create_test_node_config();
2327        config1.listen_addr =
2328            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2329        let node1 = P2PNode::new(config1).await?;
2330        node1.start().await?;
2331
2332        let mut config2 = create_test_node_config();
2333        config2.listen_addr =
2334            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2335        let node2 = P2PNode::new(config2).await?;
2336        node2.start().await?;
2337
2338        // Wait a bit for nodes to start listening
2339        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2340
2341        // Get actual listening address of node2
2342        let node2_addr = node2.local_addr().ok_or_else(|| {
2343            P2PError::Network(crate::error::NetworkError::ProtocolError(
2344                "No listening address".to_string().into(),
2345            ))
2346        })?;
2347
2348        // Connect node1 to node2
2349        let peer_id =
2350            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2351                Ok(res) => res?,
2352                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2353            };
2354
2355        // Wait a bit for connection to establish
2356        tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2357
2358        // Send a message
2359        let message_data = b"Hello, peer!".to_vec();
2360        let result = match timeout(
2361            Duration::from_millis(500),
2362            node1.send_message(&peer_id, "test-protocol", message_data),
2363        )
2364        .await
2365        {
2366            Ok(res) => res,
2367            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2368        };
2369        // For now, we'll just check that we don't get a "not connected" error
2370        // The actual send might fail due to no handler on the other side
2371        if let Err(e) = &result {
2372            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2373        }
2374
2375        // Try to send to non-existent peer
2376        let non_existent_peer = "non_existent_peer".to_string();
2377        let result = node1
2378            .send_message(&non_existent_peer, "test-protocol", vec![])
2379            .await;
2380        assert!(result.is_err(), "Sending to non-existent peer should fail");
2381
2382        Ok(())
2383    }
2384
2385    #[tokio::test]
2386    async fn test_remote_mcp_operations() -> Result<()> {
2387        let config = create_test_node_config();
2388        let node = P2PNode::new(config).await?;
2389
2390        // MCP removed; test reduced to simple start/stop
2391        node.start().await?;
2392        node.stop().await?;
2393        Ok(())
2394    }
2395
2396    #[tokio::test]
2397    async fn test_health_check() -> Result<()> {
2398        let config = create_test_node_config();
2399        let node = P2PNode::new(config).await?;
2400
2401        // Health check should pass with no connections
2402        let result = node.health_check().await;
2403        assert!(result.is_ok());
2404
2405        // Note: We're not actually connecting to real peers here
2406        // since that would require running bootstrap nodes.
2407        // The health check should still pass with no connections.
2408
2409        Ok(())
2410    }
2411
2412    #[tokio::test]
2413    async fn test_node_uptime() -> Result<()> {
2414        let config = create_test_node_config();
2415        let node = P2PNode::new(config).await?;
2416
2417        let uptime1 = node.uptime();
2418        assert!(uptime1 >= Duration::from_secs(0));
2419
2420        // Wait a bit
2421        tokio::time::sleep(Duration::from_millis(10)).await;
2422
2423        let uptime2 = node.uptime();
2424        assert!(uptime2 > uptime1);
2425
2426        Ok(())
2427    }
2428
2429    #[tokio::test]
2430    async fn test_node_config_access() -> Result<()> {
2431        let config = create_test_node_config();
2432        let expected_peer_id = config.peer_id.clone();
2433        let node = P2PNode::new(config).await?;
2434
2435        let node_config = node.config();
2436        assert_eq!(node_config.peer_id, expected_peer_id);
2437        assert_eq!(node_config.max_connections, 100);
2438        // MCP removed
2439
2440        Ok(())
2441    }
2442
2443    #[tokio::test]
2444    async fn test_mcp_server_access() -> Result<()> {
2445        let config = create_test_node_config();
2446        let _node = P2PNode::new(config).await?;
2447
2448        // MCP removed
2449        Ok(())
2450    }
2451
2452    #[tokio::test]
2453    async fn test_dht_access() -> Result<()> {
2454        let config = create_test_node_config();
2455        let node = P2PNode::new(config).await?;
2456
2457        // DHT is always available
2458        let _dht = node.dht();
2459
2460        Ok(())
2461    }
2462
2463    #[tokio::test]
2464    async fn test_node_builder() -> Result<()> {
2465        // Create a config using the builder but don't actually build a real node
2466        let builder = P2PNode::builder()
2467            .with_peer_id("builder_test_peer".to_string())
2468            .listen_on("/ip4/127.0.0.1/tcp/0")
2469            .listen_on("/ip6/::1/tcp/0")
2470            .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") // Use a valid port number
2471            .with_ipv6(true)
2472            .with_connection_timeout(Duration::from_secs(15))
2473            .with_max_connections(200);
2474
2475        // Test the configuration that was built
2476        let config = builder.config;
2477        assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2478        assert_eq!(config.listen_addrs.len(), 2); // 2 added by builder (no defaults)
2479        assert_eq!(config.bootstrap_peers_str.len(), 1); // Check bootstrap_peers_str instead
2480        assert!(config.enable_ipv6);
2481        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2482        assert_eq!(config.max_connections, 200);
2483
2484        Ok(())
2485    }
2486
2487    #[tokio::test]
2488    async fn test_bootstrap_peers() -> Result<()> {
2489        let mut config = create_test_node_config();
2490        config.bootstrap_peers = vec![
2491            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2492            std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2493        ];
2494
2495        let node = P2PNode::new(config).await?;
2496
2497        // Start node (which attempts to connect to bootstrap peers)
2498        node.start().await?;
2499
2500        // In a test environment, bootstrap peers may not be available
2501        // The test verifies the node starts correctly with bootstrap configuration
2502        // Peer count may include local/internal tracking, so we just verify it's reasonable
2503        let _peer_count = node.peer_count().await;
2504
2505        node.stop().await?;
2506        Ok(())
2507    }
2508
2509    #[tokio::test]
2510    async fn test_production_mode_disabled() -> Result<()> {
2511        let config = create_test_node_config();
2512        let node = P2PNode::new(config).await?;
2513
2514        assert!(!node.is_production_mode());
2515        assert!(node.production_config().is_none());
2516
2517        // Resource metrics should fail when production mode is disabled
2518        let result = node.resource_metrics().await;
2519        assert!(result.is_err());
2520        assert!(result.unwrap_err().to_string().contains("not enabled"));
2521
2522        Ok(())
2523    }
2524
2525    #[tokio::test]
2526    async fn test_network_event_variants() {
2527        // Test that all network event variants can be created
2528        let peer_id = "test_peer".to_string();
2529        let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2530
2531        let _peer_connected = NetworkEvent::PeerConnected {
2532            peer_id: peer_id.clone(),
2533            addresses: vec![address.clone()],
2534        };
2535
2536        let _peer_disconnected = NetworkEvent::PeerDisconnected {
2537            peer_id: peer_id.clone(),
2538            reason: "test disconnect".to_string(),
2539        };
2540
2541        let _message_received = NetworkEvent::MessageReceived {
2542            peer_id: peer_id.clone(),
2543            protocol: "test-protocol".to_string(),
2544            data: vec![1, 2, 3],
2545        };
2546
2547        let _connection_failed = NetworkEvent::ConnectionFailed {
2548            peer_id: Some(peer_id.clone()),
2549            address: address.clone(),
2550            error: "connection refused".to_string(),
2551        };
2552
2553        let _dht_stored = NetworkEvent::DHTRecordStored {
2554            key: vec![1, 2, 3],
2555            value: vec![4, 5, 6],
2556        };
2557
2558        let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2559            key: vec![1, 2, 3],
2560            value: Some(vec![4, 5, 6]),
2561        };
2562    }
2563
2564    #[tokio::test]
2565    async fn test_peer_info_structure() {
2566        let peer_info = PeerInfo {
2567            peer_id: "test_peer".to_string(),
2568            addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2569            connected_at: Instant::now(),
2570            last_seen: Instant::now(),
2571            status: ConnectionStatus::Connected,
2572            protocols: vec!["test-protocol".to_string()],
2573            heartbeat_count: 0,
2574        };
2575
2576        assert_eq!(peer_info.peer_id, "test_peer");
2577        assert_eq!(peer_info.addresses.len(), 1);
2578        assert_eq!(peer_info.status, ConnectionStatus::Connected);
2579        assert_eq!(peer_info.protocols.len(), 1);
2580    }
2581
2582    #[tokio::test]
2583    async fn test_serialization() -> Result<()> {
2584        // Test that configs can be serialized/deserialized
2585        let config = create_test_node_config();
2586        let serialized = serde_json::to_string(&config)?;
2587        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2588
2589        assert_eq!(config.peer_id, deserialized.peer_id);
2590        assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2591        assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2592
2593        Ok(())
2594    }
2595
2596    #[tokio::test]
2597    async fn test_get_peer_id_by_address_found() -> Result<()> {
2598        let config = create_test_node_config();
2599        let node = P2PNode::new(config).await?;
2600
2601        // Manually insert a peer for testing
2602        let test_peer_id = "peer_test_123".to_string();
2603        let test_address = "192.168.1.100:9000".to_string();
2604
2605        let peer_info = PeerInfo {
2606            peer_id: test_peer_id.clone(),
2607            addresses: vec![test_address.clone()],
2608            connected_at: Instant::now(),
2609            last_seen: Instant::now(),
2610            status: ConnectionStatus::Connected,
2611            protocols: vec!["test-protocol".to_string()],
2612            heartbeat_count: 0,
2613        };
2614
2615        node.transport
2616            .inject_peer(test_peer_id.clone(), peer_info)
2617            .await;
2618
2619        // Test: Find peer by address
2620        let found_peer_id = node.get_peer_id_by_address(&test_address).await;
2621        assert_eq!(found_peer_id, Some(test_peer_id));
2622
2623        Ok(())
2624    }
2625
2626    #[tokio::test]
2627    async fn test_get_peer_id_by_address_not_found() -> Result<()> {
2628        let config = create_test_node_config();
2629        let node = P2PNode::new(config).await?;
2630
2631        // Test: Try to find a peer that doesn't exist
2632        let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
2633        assert_eq!(result, None);
2634
2635        Ok(())
2636    }
2637
2638    #[tokio::test]
2639    async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
2640        let config = create_test_node_config();
2641        let node = P2PNode::new(config).await?;
2642
2643        // Test: Invalid address format should return None
2644        let result = node.get_peer_id_by_address("invalid-address").await;
2645        assert_eq!(result, None);
2646
2647        Ok(())
2648    }
2649
2650    #[tokio::test]
2651    async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
2652        let config = create_test_node_config();
2653        let node = P2PNode::new(config).await?;
2654
2655        // Add multiple peers with different addresses
2656        let peer1_id = "peer_1".to_string();
2657        let peer1_addr = "192.168.1.101:9001".to_string();
2658
2659        let peer2_id = "peer_2".to_string();
2660        let peer2_addr = "192.168.1.102:9002".to_string();
2661
2662        let peer1_info = PeerInfo {
2663            peer_id: peer1_id.clone(),
2664            addresses: vec![peer1_addr.clone()],
2665            connected_at: Instant::now(),
2666            last_seen: Instant::now(),
2667            status: ConnectionStatus::Connected,
2668            protocols: vec!["test-protocol".to_string()],
2669            heartbeat_count: 0,
2670        };
2671
2672        let peer2_info = PeerInfo {
2673            peer_id: peer2_id.clone(),
2674            addresses: vec![peer2_addr.clone()],
2675            connected_at: Instant::now(),
2676            last_seen: Instant::now(),
2677            status: ConnectionStatus::Connected,
2678            protocols: vec!["test-protocol".to_string()],
2679            heartbeat_count: 0,
2680        };
2681
2682        node.transport
2683            .inject_peer(peer1_id.clone(), peer1_info)
2684            .await;
2685        node.transport
2686            .inject_peer(peer2_id.clone(), peer2_info)
2687            .await;
2688
2689        // Test: Find each peer by their unique address
2690        let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
2691        let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
2692
2693        assert_eq!(found_peer1, Some(peer1_id));
2694        assert_eq!(found_peer2, Some(peer2_id));
2695
2696        Ok(())
2697    }
2698
2699    #[tokio::test]
2700    async fn test_list_active_connections_empty() -> Result<()> {
2701        let config = create_test_node_config();
2702        let node = P2PNode::new(config).await?;
2703
2704        // Test: No connections initially
2705        let connections = node.list_active_connections().await;
2706        assert!(connections.is_empty());
2707
2708        Ok(())
2709    }
2710
2711    #[tokio::test]
2712    async fn test_list_active_connections_with_peers() -> Result<()> {
2713        let config = create_test_node_config();
2714        let node = P2PNode::new(config).await?;
2715
2716        // Add multiple peers
2717        let peer1_id = "peer_1".to_string();
2718        let peer1_addrs = vec![
2719            "192.168.1.101:9001".to_string(),
2720            "192.168.1.101:9002".to_string(),
2721        ];
2722
2723        let peer2_id = "peer_2".to_string();
2724        let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
2725
2726        let peer1_info = PeerInfo {
2727            peer_id: peer1_id.clone(),
2728            addresses: peer1_addrs.clone(),
2729            connected_at: Instant::now(),
2730            last_seen: Instant::now(),
2731            status: ConnectionStatus::Connected,
2732            protocols: vec!["test-protocol".to_string()],
2733            heartbeat_count: 0,
2734        };
2735
2736        let peer2_info = PeerInfo {
2737            peer_id: peer2_id.clone(),
2738            addresses: peer2_addrs.clone(),
2739            connected_at: Instant::now(),
2740            last_seen: Instant::now(),
2741            status: ConnectionStatus::Connected,
2742            protocols: vec!["test-protocol".to_string()],
2743            heartbeat_count: 0,
2744        };
2745
2746        node.transport
2747            .inject_peer(peer1_id.clone(), peer1_info)
2748            .await;
2749        node.transport
2750            .inject_peer(peer2_id.clone(), peer2_info)
2751            .await;
2752
2753        // Also add to active_connections (list_active_connections iterates over this)
2754        node.transport
2755            .inject_active_connection(peer1_id.clone())
2756            .await;
2757        node.transport
2758            .inject_active_connection(peer2_id.clone())
2759            .await;
2760
2761        // Test: List all active connections
2762        let connections = node.list_active_connections().await;
2763        assert_eq!(connections.len(), 2);
2764
2765        // Verify peer1 and peer2 are in the list
2766        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
2767        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
2768
2769        assert!(peer1_conn.is_some());
2770        assert!(peer2_conn.is_some());
2771
2772        // Verify addresses match
2773        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
2774        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
2775
2776        Ok(())
2777    }
2778
2779    #[tokio::test]
2780    async fn test_remove_peer_success() -> Result<()> {
2781        let config = create_test_node_config();
2782        let node = P2PNode::new(config).await?;
2783
2784        // Add a peer
2785        let peer_id = "peer_to_remove".to_string();
2786        let peer_info = PeerInfo {
2787            peer_id: peer_id.clone(),
2788            addresses: vec!["192.168.1.100:9000".to_string()],
2789            connected_at: Instant::now(),
2790            last_seen: Instant::now(),
2791            status: ConnectionStatus::Connected,
2792            protocols: vec!["test-protocol".to_string()],
2793            heartbeat_count: 0,
2794        };
2795
2796        node.transport.inject_peer(peer_id.clone(), peer_info).await;
2797
2798        // Verify peer exists
2799        assert!(node.is_peer_connected(&peer_id).await);
2800
2801        // Remove the peer
2802        let removed = node.remove_peer(&peer_id).await;
2803        assert!(removed);
2804
2805        // Verify peer no longer exists
2806        assert!(!node.is_peer_connected(&peer_id).await);
2807
2808        Ok(())
2809    }
2810
2811    #[tokio::test]
2812    async fn test_remove_peer_nonexistent() -> Result<()> {
2813        let config = create_test_node_config();
2814        let node = P2PNode::new(config).await?;
2815
2816        // Try to remove a peer that doesn't exist
2817        let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
2818        assert!(!removed);
2819
2820        Ok(())
2821    }
2822
2823    #[tokio::test]
2824    async fn test_is_peer_connected() -> Result<()> {
2825        let config = create_test_node_config();
2826        let node = P2PNode::new(config).await?;
2827
2828        let peer_id = "test_peer".to_string();
2829
2830        // Initially not connected
2831        assert!(!node.is_peer_connected(&peer_id).await);
2832
2833        // Add peer
2834        let peer_info = PeerInfo {
2835            peer_id: peer_id.clone(),
2836            addresses: vec!["192.168.1.100:9000".to_string()],
2837            connected_at: Instant::now(),
2838            last_seen: Instant::now(),
2839            status: ConnectionStatus::Connected,
2840            protocols: vec!["test-protocol".to_string()],
2841            heartbeat_count: 0,
2842        };
2843
2844        node.transport.inject_peer(peer_id.clone(), peer_info).await;
2845
2846        // Now connected
2847        assert!(node.is_peer_connected(&peer_id).await);
2848
2849        // Remove peer
2850        node.remove_peer(&peer_id).await;
2851
2852        // No longer connected
2853        assert!(!node.is_peer_connected(&peer_id).await);
2854
2855        Ok(())
2856    }
2857
2858    #[test]
2859    fn test_normalize_ipv6_wildcard() {
2860        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
2861
2862        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
2863        let normalized = normalize_wildcard_to_loopback(wildcard);
2864
2865        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
2866        assert_eq!(normalized.port(), 8080);
2867    }
2868
2869    #[test]
2870    fn test_normalize_ipv4_wildcard() {
2871        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2872
2873        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
2874        let normalized = normalize_wildcard_to_loopback(wildcard);
2875
2876        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
2877        assert_eq!(normalized.port(), 9000);
2878    }
2879
2880    #[test]
2881    fn test_normalize_specific_address_unchanged() {
2882        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
2883        let normalized = normalize_wildcard_to_loopback(specific);
2884
2885        assert_eq!(normalized, specific);
2886    }
2887
2888    #[test]
2889    fn test_normalize_loopback_unchanged() {
2890        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
2891
2892        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
2893        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
2894        assert_eq!(normalized_v6, loopback_v6);
2895
2896        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
2897        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
2898        assert_eq!(normalized_v4, loopback_v4);
2899    }
2900
2901    // ---- parse_protocol_message regression tests ----
2902
2903    /// Get current Unix timestamp for tests
2904    fn current_timestamp() -> u64 {
2905        std::time::SystemTime::now()
2906            .duration_since(std::time::UNIX_EPOCH)
2907            .map(|d| d.as_secs())
2908            .unwrap_or(0)
2909    }
2910
2911    /// Helper to create a postcard-serialized WireMessage for tests
2912    fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
2913        let msg = WireMessage {
2914            protocol: protocol.to_string(),
2915            data,
2916            from: from.to_string(),
2917            timestamp,
2918        };
2919        postcard::to_stdvec(&msg).unwrap()
2920    }
2921
2922    #[test]
2923    fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
2924        // Regression: P2PEvent::Message.source must be the transport peer ID,
2925        // NOT the "from" field from the wire message.  This ensures consumers
2926        // can pass source directly to send_message().
2927        let transport_id = "abcdef0123456789";
2928        let logical_id = "spoofed-logical-id";
2929        let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
2930
2931        let event =
2932            parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
2933
2934        match event {
2935            P2PEvent::Message {
2936                topic,
2937                source,
2938                data,
2939            } => {
2940                assert_eq!(source, transport_id, "source must be the transport peer ID");
2941                assert_ne!(
2942                    source, logical_id,
2943                    "source must NOT be the logical 'from' field"
2944                );
2945                assert_eq!(topic, "test/v1");
2946                assert_eq!(data, vec![1u8, 2, 3]);
2947            }
2948            other => panic!("expected P2PEvent::Message, got {:?}", other),
2949        }
2950    }
2951
2952    #[test]
2953    fn test_parse_protocol_message_rejects_invalid_bytes() {
2954        // Random bytes that are not valid bincode should be rejected
2955        assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
2956    }
2957
2958    #[test]
2959    fn test_parse_protocol_message_rejects_truncated_message() {
2960        // A truncated bincode message should fail to deserialize
2961        let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
2962        let truncated = &full_bytes[..full_bytes.len() / 2];
2963        assert!(parse_protocol_message(truncated, "peer-id").is_none());
2964    }
2965
2966    #[test]
2967    fn test_parse_protocol_message_empty_payload() {
2968        let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
2969
2970        let event = parse_protocol_message(&bytes, "transport-peer")
2971            .expect("valid message with empty data should parse");
2972
2973        match event {
2974            P2PEvent::Message { data, .. } => assert!(data.is_empty()),
2975            other => panic!("expected P2PEvent::Message, got {:?}", other),
2976        }
2977    }
2978
2979    #[test]
2980    fn test_parse_protocol_message_preserves_binary_payload() {
2981        // Verify that arbitrary byte values (including 0xFF, 0x00) survive round-trip
2982        let payload: Vec<u8> = (0..=255).collect();
2983        let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
2984
2985        let event = parse_protocol_message(&bytes, "peer-id")
2986            .expect("valid message with full byte range should parse");
2987
2988        match event {
2989            P2PEvent::Message { data, topic, .. } => {
2990                assert_eq!(topic, "binary/v1");
2991                assert_eq!(
2992                    data, payload,
2993                    "payload must survive bincode round-trip exactly"
2994                );
2995            }
2996            other => panic!("expected P2PEvent::Message, got {:?}", other),
2997        }
2998    }
2999}