Skip to main content

saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::PeerId;
20use crate::adaptive::{AdaptiveDHT, AdaptiveDhtConfig, TrustEngine, TrustEvent};
21use crate::bootstrap::{BootstrapConfig, BootstrapManager};
22use crate::dht_network_manager::{DhtNetworkConfig, DhtNetworkManager};
23use crate::error::{IdentityError, NetworkError, P2PError, P2pResult as Result};
24
25use crate::MultiAddr;
26use crate::identity::node_identity::{NodeIdentity, peer_id_from_public_key};
27use crate::quantum_crypto::saorsa_transport_integration::{MlDsaPublicKey, MlDsaSignature};
28use parking_lot::Mutex as ParkingMutex;
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::time::Duration;
34use tokio::sync::{Mutex as TokioMutex, RwLock, broadcast};
35use tokio::time::Instant;
36use tokio_util::sync::CancellationToken;
37use tracing::{debug, info, trace, warn};
38
39/// Wire protocol message format for P2P communication.
40///
41/// Serialized with postcard for compact binary encoding.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub(crate) struct WireMessage {
44    /// Protocol/topic identifier
45    pub(crate) protocol: String,
46    /// Raw payload bytes
47    pub(crate) data: Vec<u8>,
48    /// Sender's peer ID (verified against transport-level identity)
49    pub(crate) from: PeerId,
50    /// Unix timestamp in seconds
51    pub(crate) timestamp: u64,
52    /// User agent string identifying the sender's software.
53    ///
54    /// Convention: `"node/<version>"` for full DHT participants,
55    /// `"client/<version>"` or `"<app>/<version>"` for ephemeral clients.
56    /// Included in the signed bytes — tamper-proof.
57    #[serde(default)]
58    pub(crate) user_agent: String,
59    /// Sender's ML-DSA-65 public key (1952 bytes). Empty if unsigned.
60    #[serde(default)]
61    pub(crate) public_key: Vec<u8>,
62    /// ML-DSA-65 signature over the signable bytes. Empty if unsigned.
63    #[serde(default)]
64    pub(crate) signature: Vec<u8>,
65}
66
67/// Operating mode of a P2P node.
68///
69/// Determines the default user agent and DHT participation behavior.
70/// `Node` peers participate in the DHT routing table; `Client` peers
71/// are treated as ephemeral and excluded from routing.
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
73pub enum NodeMode {
74    /// Full DHT-participant node that maintains routing state and routes messages.
75    #[default]
76    Node,
77    /// Ephemeral client that connects to perform operations without joining the DHT.
78    Client,
79}
80
81/// Internal listen mode controlling which network interfaces the node binds to.
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83enum ListenMode {
84    /// Bind to all interfaces (`0.0.0.0` / `::`).
85    Public,
86    /// Bind to loopback only (`127.0.0.1` / `::1`).
87    Local,
88}
89
90/// Returns the default user agent string for the given mode.
91///
92/// - `Node` → `"node/<saorsa-core-version>"`
93/// - `Client` → `"client/<saorsa-core-version>"`
94pub fn user_agent_for_mode(mode: NodeMode) -> String {
95    let prefix = match mode {
96        NodeMode::Node => "node",
97        NodeMode::Client => "client",
98    };
99    format!("{prefix}/{}", env!("CARGO_PKG_VERSION"))
100}
101
102/// Returns `true` if the user agent identifies a full DHT participant (prefix `"node/"`).
103pub fn is_dht_participant(user_agent: &str) -> bool {
104    user_agent.starts_with("node/")
105}
106
107/// Capacity of the internal channel used by the message receiving system.
108pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
109
110/// Maximum number of concurrent in-flight request/response operations.
111pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
112
113/// Maximum allowed timeout for a single request (5 minutes).
114pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
115
116/// Default listen port for the P2P node.
117const DEFAULT_LISTEN_PORT: u16 = 9000;
118
119/// Default maximum number of concurrent connections.
120const DEFAULT_MAX_CONNECTIONS: usize = 10_000;
121
122/// Default connection timeout in seconds.
123const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 30;
124
125/// DHT max XOR distance (full 160-bit keyspace).
126const DHT_MAX_DISTANCE: u8 = 160;
127
128/// Number of cached bootstrap peers to retrieve.
129const BOOTSTRAP_PEER_BATCH_SIZE: usize = 20;
130
131/// Timeout in seconds for waiting on a bootstrap peer's identity exchange.
132const BOOTSTRAP_IDENTITY_TIMEOUT_SECS: u64 = 10;
133
134/// Serde helper — returns `true`.
135const fn default_true() -> bool {
136    true
137}
138
139/// Configuration for a P2P node
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct NodeConfig {
142    /// Bind to loopback only (`127.0.0.1` / `::1`).
143    ///
144    /// When `true`, the node listens on loopback addresses suitable for
145    /// local development and testing. When `false` (the default), the node
146    /// listens on all interfaces (`0.0.0.0` / `::`).
147    #[serde(default)]
148    pub local: bool,
149
150    /// Listen port. `0` means OS-assigned ephemeral port.
151    #[serde(default)]
152    pub port: u16,
153
154    /// Enable IPv6 dual-stack binding.
155    ///
156    /// When `true` (the default), both an IPv4 and an IPv6 address are
157    /// bound. When `false`, only IPv4 is used.
158    #[serde(default = "default_true")]
159    pub ipv6: bool,
160
161    /// Bootstrap peers to connect to on startup.
162    pub bootstrap_peers: Vec<crate::MultiAddr>,
163
164    // MCP removed; will be redesigned later
165    /// Connection timeout duration
166    pub connection_timeout: Duration,
167
168    /// Maximum number of concurrent connections
169    pub max_connections: usize,
170
171    /// DHT configuration
172    pub dht_config: DHTConfig,
173
174    /// Bootstrap cache configuration
175    pub bootstrap_cache_config: Option<BootstrapConfig>,
176
177    /// Optional IP diversity configuration for Sybil protection tuning.
178    ///
179    /// When set, this configuration is used by bootstrap peer discovery and
180    /// other diversity-enforcing subsystems. If `None`, defaults are used.
181    pub diversity_config: Option<crate::security::IPDiversityConfig>,
182
183    /// Optional override for the maximum application-layer message size.
184    ///
185    /// When `None`, the underlying saorsa-transport default is used.
186    #[serde(default)]
187    pub max_message_size: Option<usize>,
188
189    /// Optional node identity for app-level message signing.
190    ///
191    /// When set, outgoing messages are signed with the node's ML-DSA-65 key
192    /// and incoming signed messages are verified at the transport layer.
193    #[serde(skip)]
194    pub node_identity: Option<Arc<NodeIdentity>>,
195
196    /// Operating mode of this node.
197    ///
198    /// Determines the default user agent and DHT participation:
199    /// - `Node` → user agent `"node/<version>"`, added to DHT routing tables.
200    /// - `Client` → user agent `"client/<version>"`, treated as ephemeral.
201    #[serde(default)]
202    pub mode: NodeMode,
203
204    /// Optional custom user agent override.
205    ///
206    /// When `Some`, this value is used instead of the mode-derived default.
207    /// When `None`, the user agent is derived from [`NodeConfig::mode`].
208    #[serde(default, skip_serializing_if = "Option::is_none")]
209    pub custom_user_agent: Option<String>,
210
211    /// Allow loopback addresses (127.0.0.1, ::1) in the transport layer.
212    ///
213    /// In production, loopback addresses are rejected because they are not
214    /// routable. Enable this for local devnets and testnets where all nodes
215    /// run on the same machine.
216    ///
217    /// Default: `false`
218    #[serde(default)]
219    pub allow_loopback: bool,
220
221    /// Adaptive DHT configuration (trust-based blocking and eviction).
222    ///
223    /// Controls whether peers with low trust scores are evicted from the
224    /// routing table and blocked from DHT operations. Use
225    /// [`NodeConfigBuilder::trust_enforcement`] for a simple on/off toggle.
226    ///
227    /// Default: enabled with a block threshold of 0.15.
228    #[serde(default)]
229    pub adaptive_dht_config: AdaptiveDhtConfig,
230}
231
232/// DHT-specific configuration
233#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct DHTConfig {
235    /// Kademlia K parameter (bucket size)
236    pub k_value: usize,
237
238    /// Kademlia alpha parameter (parallelism)
239    pub alpha_value: usize,
240
241    /// DHT refresh interval
242    pub refresh_interval: Duration,
243}
244
245// ============================================================================
246// Address Construction Helpers
247// ============================================================================
248
249/// Build QUIC listen addresses based on port, IPv6 preference, and listen mode.
250///
251/// All returned addresses use the QUIC transport — the only transport
252/// currently supported for dialing. When additional transports are added,
253/// extend this function to produce addresses for those transports as well.
254///
255/// `ListenMode::Public` uses unspecified (all-interface) addresses;
256/// `ListenMode::Local` uses loopback addresses.
257#[inline]
258fn build_listen_addrs(port: u16, ipv6_enabled: bool, mode: ListenMode) -> Vec<MultiAddr> {
259    let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
260
261    let (v4, v6) = match mode {
262        ListenMode::Public => (
263            std::net::Ipv4Addr::UNSPECIFIED,
264            std::net::Ipv6Addr::UNSPECIFIED,
265        ),
266        ListenMode::Local => (std::net::Ipv4Addr::LOCALHOST, std::net::Ipv6Addr::LOCALHOST),
267    };
268
269    if ipv6_enabled {
270        addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
271            std::net::IpAddr::V6(v6),
272            port,
273        )));
274    }
275
276    addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
277        std::net::IpAddr::V4(v4),
278        port,
279    )));
280
281    addrs
282}
283
284impl NodeConfig {
285    /// Returns the effective user agent string.
286    ///
287    /// If a custom user agent was set, returns that. Otherwise, derives
288    /// the user agent from the node's [`NodeMode`].
289    pub fn user_agent(&self) -> String {
290        self.custom_user_agent
291            .clone()
292            .unwrap_or_else(|| user_agent_for_mode(self.mode))
293    }
294
295    /// Compute the listen addresses from the configuration fields.
296    ///
297    /// The returned addresses are derived from [`local`](Self::local),
298    /// [`port`](Self::port), and [`ipv6`](Self::ipv6).
299    pub fn listen_addrs(&self) -> Vec<MultiAddr> {
300        let mode = if self.local {
301            ListenMode::Local
302        } else {
303            ListenMode::Public
304        };
305        build_listen_addrs(self.port, self.ipv6, mode)
306    }
307
308    /// Create a new NodeConfig with default values
309    ///
310    /// # Errors
311    ///
312    /// Returns an error if default addresses cannot be parsed
313    pub fn new() -> Result<Self> {
314        Ok(Self::default())
315    }
316
317    /// Create a builder for customized NodeConfig construction
318    pub fn builder() -> NodeConfigBuilder {
319        NodeConfigBuilder::default()
320    }
321}
322
323// ============================================================================
324// NodeConfig Builder Pattern
325// ============================================================================
326
327/// Builder for constructing [`NodeConfig`] with a transport-aware fluent API.
328///
329/// Defaults are chosen for quick local development:
330/// - QUIC on a random free port (`0`)
331/// - IPv6 enabled (dual-stack)
332/// - All interfaces (not local-only)
333///
334/// # Examples
335///
336/// ```rust,ignore
337/// // Simplest — QUIC on random port, IPv6 on, all interfaces
338/// let config = NodeConfig::builder().build()?;
339///
340/// // Local dev/test mode (loopback, auto-enables allow_loopback)
341/// let config = NodeConfig::builder()
342///     .local(true)
343///     .build()?;
344/// ```
345#[derive(Debug, Clone)]
346pub struct NodeConfigBuilder {
347    port: u16,
348    ipv6: bool,
349    local: bool,
350    bootstrap_peers: Vec<crate::MultiAddr>,
351    max_connections: Option<usize>,
352    connection_timeout: Option<Duration>,
353    dht_config: Option<DHTConfig>,
354    max_message_size: Option<usize>,
355    mode: NodeMode,
356    custom_user_agent: Option<String>,
357    allow_loopback: Option<bool>,
358    adaptive_dht_config: Option<AdaptiveDhtConfig>,
359}
360
361impl Default for NodeConfigBuilder {
362    fn default() -> Self {
363        Self {
364            port: 0,
365            ipv6: true,
366            local: false,
367            bootstrap_peers: Vec::new(),
368            max_connections: None,
369            connection_timeout: None,
370            dht_config: None,
371            max_message_size: None,
372            mode: NodeMode::default(),
373            custom_user_agent: None,
374            allow_loopback: None,
375            adaptive_dht_config: None,
376        }
377    }
378}
379
380impl NodeConfigBuilder {
381    /// Set the listen port. Default: `0` (random free port).
382    pub fn port(mut self, port: u16) -> Self {
383        self.port = port;
384        self
385    }
386
387    /// Enable or disable IPv6 dual-stack. Default: `true`.
388    pub fn ipv6(mut self, enabled: bool) -> Self {
389        self.ipv6 = enabled;
390        self
391    }
392
393    /// Bind to loopback only (`true`) or all interfaces (`false`).
394    ///
395    /// When `true`, automatically enables `allow_loopback` unless explicitly
396    /// overridden via [`Self::allow_loopback`].
397    ///
398    /// Default: `false` (all interfaces).
399    pub fn local(mut self, local: bool) -> Self {
400        self.local = local;
401        self
402    }
403
404    /// Add a bootstrap peer.
405    pub fn bootstrap_peer(mut self, addr: crate::MultiAddr) -> Self {
406        self.bootstrap_peers.push(addr);
407        self
408    }
409
410    /// Set maximum connections.
411    pub fn max_connections(mut self, max: usize) -> Self {
412        self.max_connections = Some(max);
413        self
414    }
415
416    /// Set connection timeout.
417    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
418        self.connection_timeout = Some(timeout);
419        self
420    }
421
422    /// Set DHT configuration.
423    pub fn dht_config(mut self, config: DHTConfig) -> Self {
424        self.dht_config = Some(config);
425        self
426    }
427
428    /// Set maximum application-layer message size in bytes.
429    ///
430    /// If this method is not called, saorsa-transport's built-in default is used.
431    pub fn max_message_size(mut self, max_message_size: usize) -> Self {
432        self.max_message_size = Some(max_message_size);
433        self
434    }
435
436    /// Set the operating mode (Node or Client).
437    pub fn mode(mut self, mode: NodeMode) -> Self {
438        self.mode = mode;
439        self
440    }
441
442    /// Set a custom user agent string, overriding the mode-derived default.
443    pub fn custom_user_agent(mut self, user_agent: impl Into<String>) -> Self {
444        self.custom_user_agent = Some(user_agent.into());
445        self
446    }
447
448    /// Explicitly control whether loopback addresses are allowed in the
449    /// transport layer. When not called, `local(true)` auto-enables this;
450    /// `local(false)` defaults to `false`.
451    pub fn allow_loopback(mut self, allow: bool) -> Self {
452        self.allow_loopback = Some(allow);
453        self
454    }
455
456    /// Enable or disable trust-based peer eviction and blocking.
457    ///
458    /// When `false`, peers are never evicted from the routing table or
459    /// blocked from DHT operations based on trust scores. Trust scores
460    /// are still tracked but have no enforcement effect.
461    ///
462    /// When `true` (the default), peers whose trust score falls below the
463    /// block threshold (0.15) are immediately evicted and blocked.
464    ///
465    /// For fine-grained control over the threshold, use
466    /// [`adaptive_dht_config`](Self::adaptive_dht_config) instead.
467    pub fn trust_enforcement(mut self, enabled: bool) -> Self {
468        let threshold = if enabled {
469            AdaptiveDhtConfig::default().block_threshold
470        } else {
471            0.0
472        };
473        self.adaptive_dht_config = Some(AdaptiveDhtConfig {
474            block_threshold: threshold,
475        });
476        self
477    }
478
479    /// Set the full adaptive DHT configuration.
480    ///
481    /// Overrides any previous call to [`trust_enforcement`](Self::trust_enforcement).
482    pub fn adaptive_dht_config(mut self, config: AdaptiveDhtConfig) -> Self {
483        self.adaptive_dht_config = Some(config);
484        self
485    }
486
487    /// Build the [`NodeConfig`].
488    ///
489    /// # Errors
490    ///
491    /// Returns an error if address construction fails.
492    pub fn build(self) -> Result<NodeConfig> {
493        // local mode auto-enables allow_loopback unless explicitly overridden
494        let allow_loopback = self.allow_loopback.unwrap_or(self.local);
495
496        Ok(NodeConfig {
497            local: self.local,
498            port: self.port,
499            ipv6: self.ipv6,
500            bootstrap_peers: self.bootstrap_peers,
501            connection_timeout: self
502                .connection_timeout
503                .unwrap_or(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS)),
504            max_connections: self.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
505            dht_config: self.dht_config.unwrap_or_default(),
506            bootstrap_cache_config: None,
507            diversity_config: None,
508            max_message_size: self.max_message_size,
509            node_identity: None,
510            mode: self.mode,
511            custom_user_agent: self.custom_user_agent,
512            allow_loopback,
513            adaptive_dht_config: self.adaptive_dht_config.unwrap_or_default(),
514        })
515    }
516}
517
518impl Default for NodeConfig {
519    fn default() -> Self {
520        Self {
521            local: false,
522            port: DEFAULT_LISTEN_PORT,
523            ipv6: true,
524            bootstrap_peers: Vec::new(),
525            connection_timeout: Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS),
526            max_connections: DEFAULT_MAX_CONNECTIONS,
527            dht_config: DHTConfig::default(),
528            bootstrap_cache_config: None,
529            diversity_config: None,
530            max_message_size: None,
531            node_identity: None,
532            mode: NodeMode::default(),
533            custom_user_agent: None,
534            allow_loopback: false,
535            adaptive_dht_config: AdaptiveDhtConfig::default(),
536        }
537    }
538}
539
540impl DHTConfig {
541    const DEFAULT_K_VALUE: usize = 20;
542    const DEFAULT_ALPHA_VALUE: usize = 5;
543    const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
544}
545
546impl Default for DHTConfig {
547    fn default() -> Self {
548        Self {
549            k_value: Self::DEFAULT_K_VALUE,
550            alpha_value: Self::DEFAULT_ALPHA_VALUE,
551            refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
552        }
553    }
554}
555
556/// Information about a connected peer
557#[derive(Debug, Clone)]
558pub struct PeerInfo {
559    /// Transport-level channel identifier (internal use only).
560    #[allow(dead_code)]
561    pub(crate) channel_id: String,
562
563    /// Peer's addresses
564    pub addresses: Vec<MultiAddr>,
565
566    /// Connection timestamp
567    pub connected_at: Instant,
568
569    /// Last seen timestamp
570    pub last_seen: Instant,
571
572    /// Connection status
573    pub status: ConnectionStatus,
574
575    /// Supported protocols
576    pub protocols: Vec<String>,
577
578    /// Number of heartbeats received
579    pub heartbeat_count: u64,
580}
581
582/// Connection status for a peer
583#[derive(Debug, Clone, PartialEq)]
584pub enum ConnectionStatus {
585    /// Connection is being established
586    Connecting,
587    /// Connection is established and active
588    Connected,
589    /// Connection is being closed
590    Disconnecting,
591    /// Connection is closed
592    Disconnected,
593    /// Connection failed
594    Failed(String),
595}
596
597/// Network events that can occur in the P2P system
598///
599/// Events are broadcast to all listeners and provide real-time
600/// notifications of network state changes and message arrivals.
601#[derive(Debug, Clone)]
602pub enum P2PEvent {
603    /// Message received from a peer on a specific topic
604    Message {
605        /// Topic or channel the message was sent on
606        topic: String,
607        /// For signed messages this is the authenticated app-level [`PeerId`];
608        /// `None` for unsigned messages.
609        source: Option<PeerId>,
610        /// Raw message data payload
611        data: Vec<u8>,
612    },
613    /// An authenticated peer has connected (first signed message verified on any channel).
614    /// The `user_agent` identifies the remote software (e.g. `"node/0.12.1"`, `"client/1.0"`).
615    PeerConnected(PeerId, String),
616    /// An authenticated peer has fully disconnected (all channels closed).
617    PeerDisconnected(PeerId),
618}
619
620/// Response from a peer to a request sent via [`P2PNode::send_request`].
621///
622/// Contains the response payload along with metadata about the responder
623/// and round-trip latency.
624#[derive(Debug, Clone)]
625pub struct PeerResponse {
626    /// The peer that sent the response.
627    pub peer_id: PeerId,
628    /// Raw response payload bytes.
629    pub data: Vec<u8>,
630    /// Round-trip latency from request to response.
631    pub latency: Duration,
632}
633
634/// Wire format for request/response correlation.
635///
636/// Wraps application payloads with a message ID and direction flag
637/// so the receive loop can route responses back to waiting callers.
638#[derive(Debug, Clone, Serialize, Deserialize)]
639pub(crate) struct RequestResponseEnvelope {
640    /// Unique identifier to correlate request ↔ response.
641    pub(crate) message_id: String,
642    /// `false` for requests, `true` for responses.
643    pub(crate) is_response: bool,
644    /// Application payload.
645    pub(crate) payload: Vec<u8>,
646}
647
648/// An in-flight request awaiting a response from a specific peer.
649pub(crate) struct PendingRequest {
650    /// Oneshot sender for delivering the response payload.
651    pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
652    /// The peer we expect the response from (for origin validation).
653    pub(crate) expected_peer: PeerId,
654}
655
656/// Maximum time to wait for identity exchange during a reconnect-on-send dial.
657const RECONNECT_IDENTITY_TIMEOUT: Duration = Duration::from_secs(5);
658
659/// Short grace period after closing stale QUIC connections before re-dialing.
660///
661/// `disconnect_channel` is async and waits for the QUIC close, but the
662/// transport endpoint may need a moment to fully release internal state.
663/// Only applied when stale channels were actually disconnected.
664const QUIC_TEARDOWN_GRACE: Duration = Duration::from_millis(100);
665
666/// Main P2P network node that manages connections, routing, and communication
667///
668/// This struct represents a complete P2P network participant that can:
669/// - Connect to other peers via QUIC transport
670/// - Participate in distributed hash table (DHT) operations
671/// - Send and receive messages through various protocols
672/// - Handle network events and peer lifecycle
673///
674/// Transport concerns (connections, messaging, events) are delegated to
675/// [`TransportHandle`](crate::transport_handle::TransportHandle).
676pub struct P2PNode {
677    /// Node configuration
678    config: NodeConfig,
679
680    /// Our peer ID
681    peer_id: PeerId,
682
683    /// Transport handle owning all QUIC / peer / event state
684    transport: Arc<crate::transport_handle::TransportHandle>,
685
686    /// Node start time
687    start_time: Instant,
688
689    /// Shutdown token — cancelled when the node should stop
690    shutdown: CancellationToken,
691
692    /// Adaptive DHT layer — owns both the DHT manager and the trust engine.
693    /// All DHT operations and trust signals go through this component.
694    adaptive_dht: AdaptiveDHT,
695
696    /// Bootstrap cache manager for peer discovery
697    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
698
699    /// Bootstrap state tracking - indicates whether peer discovery has completed
700    is_bootstrapped: Arc<AtomicBool>,
701
702    /// Whether `start()` has been called (and `stop()` has not yet completed)
703    is_started: Arc<AtomicBool>,
704
705    /// Per-peer locks that serialise reconnect attempts so concurrent sends
706    /// to the same stale peer don't race to dial.  Entries accumulate over
707    /// the node's lifetime; each is a lightweight `Arc<TokioMutex<()>>`.
708    reconnect_locks: ParkingMutex<HashMap<PeerId, Arc<TokioMutex<()>>>>,
709}
710
711/// Normalize wildcard bind addresses to localhost loopback addresses
712///
713/// saorsa-transport correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
714/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
715///
716/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
717/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
718/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
719/// - All other addresses pass through unchanged
720///
721/// # Arguments
722/// * `addr` - The SocketAddr to normalize
723///
724/// # Returns
725/// * Normalized SocketAddr suitable for remote connections
726pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
727    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
728
729    if addr.ip().is_unspecified() {
730        // Convert unspecified addresses to loopback
731        let loopback_ip = match addr {
732            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
733            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
734        };
735        std::net::SocketAddr::new(loopback_ip, addr.port())
736    } else {
737        // Not a wildcard address, pass through unchanged
738        addr
739    }
740}
741
742impl P2PNode {
743    /// Create a new P2P node with the given configuration
744    pub async fn new(config: NodeConfig) -> Result<Self> {
745        // Ensure a cryptographic identity exists — generate one if not provided.
746        let node_identity = match config.node_identity.clone() {
747            Some(identity) => identity,
748            None => Arc::new(NodeIdentity::generate()?),
749        };
750
751        // Derive the canonical peer ID from the cryptographic identity.
752        let peer_id = *node_identity.peer_id();
753
754        // Initialize bootstrap cache manager
755        let bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
756        let bootstrap_manager =
757            match BootstrapManager::with_node_config(bootstrap_config, &config).await {
758                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
759                Err(e) => {
760                    warn!("Failed to initialize bootstrap manager: {e}, continuing without cache");
761                    None
762                }
763            };
764
765        // Build transport handle with all transport-level concerns
766        let transport_config = crate::transport_handle::TransportConfig::from_node_config(
767            &config,
768            crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
769            node_identity.clone(),
770        );
771        let transport =
772            Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
773
774        // Initialize AdaptiveDHT — creates the trust engine and DHT manager
775        let manager_dht_config = crate::dht::DHTConfig {
776            bucket_size: config.dht_config.k_value,
777            alpha: config.dht_config.alpha_value,
778            bucket_refresh_interval: config.dht_config.refresh_interval,
779            max_distance: DHT_MAX_DISTANCE,
780        };
781        let dht_manager_config = DhtNetworkConfig {
782            peer_id,
783            dht_config: manager_dht_config,
784            node_config: config.clone(),
785            request_timeout: config.connection_timeout,
786            max_concurrent_operations: MAX_ACTIVE_REQUESTS,
787            enable_security: true,
788            block_threshold: 0.0, // Set by AdaptiveDHT::new() from AdaptiveDhtConfig
789        };
790        let adaptive_dht = AdaptiveDHT::new(
791            transport.clone(),
792            dht_manager_config,
793            config.adaptive_dht_config.clone(),
794        )
795        .await?;
796
797        let node = Self {
798            config,
799            peer_id,
800            transport,
801            start_time: Instant::now(),
802            shutdown: CancellationToken::new(),
803            adaptive_dht,
804            bootstrap_manager,
805            is_bootstrapped: Arc::new(AtomicBool::new(false)),
806            is_started: Arc::new(AtomicBool::new(false)),
807            reconnect_locks: ParkingMutex::new(HashMap::new()),
808        };
809        info!(
810            "Created P2P node with peer ID: {} (call start() to begin networking)",
811            node.peer_id
812        );
813
814        Ok(node)
815    }
816
817    /// Get the peer ID of this node.
818    pub fn peer_id(&self) -> &PeerId {
819        &self.peer_id
820    }
821
822    /// Get the transport handle for sharing with other components.
823    pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
824        &self.transport
825    }
826
827    pub fn local_addr(&self) -> Option<MultiAddr> {
828        self.transport.local_addr()
829    }
830
831    /// Check if the node has completed the initial bootstrap process
832    ///
833    /// Returns `true` if the node has successfully connected to at least one
834    /// bootstrap peer and performed peer discovery (FIND_NODE).
835    pub fn is_bootstrapped(&self) -> bool {
836        self.is_bootstrapped.load(Ordering::SeqCst)
837    }
838
839    /// Manually trigger re-bootstrap (useful for recovery or network rejoin)
840    ///
841    /// This clears the bootstrapped state and attempts to reconnect to
842    /// bootstrap peers and discover new peers.
843    pub async fn re_bootstrap(&self) -> Result<()> {
844        self.is_bootstrapped.store(false, Ordering::SeqCst);
845        self.connect_bootstrap_peers().await
846    }
847
848    // =========================================================================
849    // Trust API — delegates to AdaptiveDHT
850    // =========================================================================
851
852    /// Get the trust engine for advanced use cases
853    pub fn trust_engine(&self) -> Arc<TrustEngine> {
854        self.adaptive_dht.trust_engine().clone()
855    }
856
857    /// Report a trust event for a peer.
858    ///
859    /// Records a network-observable outcome (connection success/failure)
860    /// that the DHT layer did not record automatically. See [`TrustEvent`]
861    /// for the supported variants.
862    ///
863    /// # Example
864    ///
865    /// ```rust,ignore
866    /// use saorsa_core::adaptive::TrustEvent;
867    ///
868    /// node.report_trust_event(&peer_id, TrustEvent::SuccessfulResponse).await;
869    /// node.report_trust_event(&peer_id, TrustEvent::ConnectionFailed).await;
870    /// ```
871    pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
872        self.adaptive_dht.report_trust_event(peer_id, event).await;
873    }
874
875    /// Get the current trust score for a peer (0.0 to 1.0).
876    ///
877    /// Returns 0.5 (neutral) for unknown peers.
878    pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
879        self.adaptive_dht.peer_trust(peer_id)
880    }
881
882    /// Get the AdaptiveDHT component for direct access
883    pub fn adaptive_dht(&self) -> &AdaptiveDHT {
884        &self.adaptive_dht
885    }
886
887    // =========================================================================
888    // Request/Response API — Automatic Trust Feedback
889    // =========================================================================
890
891    /// Send a request to a peer and wait for a response with automatic trust reporting.
892    ///
893    /// Unlike fire-and-forget `send_message()`, this method:
894    /// 1. Wraps the payload in a `RequestResponseEnvelope` with a unique message ID
895    /// 2. Sends it on the `/rr/<protocol>` protocol prefix
896    /// 3. Waits for a matching response (or timeout)
897    /// 4. Automatically reports success or failure to the trust engine
898    ///
899    /// The remote peer's handler should call `send_response()` with the
900    /// incoming message ID to route the response back.
901    ///
902    /// # Arguments
903    ///
904    /// * `peer_id` - Target peer
905    /// * `protocol` - Application protocol name (e.g. `"peer_info"`)
906    /// * `data` - Request payload bytes
907    /// * `timeout` - Maximum time to wait for a response
908    ///
909    /// # Returns
910    ///
911    /// A [`PeerResponse`] on success, or an error on timeout / connection failure.
912    ///
913    /// # Example
914    ///
915    /// ```rust,ignore
916    /// let response = node.send_request(&peer_id, "peer_info", request_data, Duration::from_secs(10)).await?;
917    /// println!("Got {} bytes from {}", response.data.len(), response.peer_id);
918    /// ```
919    pub async fn send_request(
920        &self,
921        peer_id: &PeerId,
922        protocol: &str,
923        data: Vec<u8>,
924        timeout: Duration,
925    ) -> Result<PeerResponse> {
926        // Fail fast for blocked peers
927        if self.adaptive_dht.peer_trust(peer_id) < self.adaptive_dht.config().block_threshold {
928            return Err(P2PError::Network(crate::error::NetworkError::PeerBlocked(
929                *peer_id,
930            )));
931        }
932
933        match self
934            .transport
935            .send_request(peer_id, protocol, data, timeout)
936            .await
937        {
938            Ok(resp) => {
939                self.report_trust_event(peer_id, TrustEvent::SuccessfulResponse)
940                    .await;
941                Ok(resp)
942            }
943            Err(e) => {
944                let event = if matches!(&e, P2PError::Timeout(_)) {
945                    TrustEvent::ConnectionTimeout
946                } else {
947                    TrustEvent::ConnectionFailed
948                };
949                self.report_trust_event(peer_id, event).await;
950                Err(e)
951            }
952        }
953    }
954
955    pub async fn send_response(
956        &self,
957        peer_id: &PeerId,
958        protocol: &str,
959        message_id: &str,
960        data: Vec<u8>,
961    ) -> Result<()> {
962        self.transport
963            .send_response(peer_id, protocol, message_id, data)
964            .await
965    }
966
967    pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
968        crate::transport_handle::TransportHandle::parse_request_envelope(data)
969    }
970
971    pub async fn subscribe(&self, topic: &str) -> Result<()> {
972        self.transport.subscribe(topic).await
973    }
974
975    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
976        self.transport.publish(topic, data).await
977    }
978
979    /// Get the node configuration
980    pub fn config(&self) -> &NodeConfig {
981        &self.config
982    }
983
984    /// Start the P2P node
985    pub async fn start(&self) -> Result<()> {
986        info!("Starting P2P node...");
987
988        // Start bootstrap manager background tasks
989        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
990            let mut manager = bootstrap_manager.write().await;
991            manager
992                .start_maintenance()
993                .map_err(|e| protocol_error(format!("Failed to start bootstrap manager: {e}")))?;
994            info!("Bootstrap cache manager started");
995        }
996
997        // Start transport listeners and message receiving
998        self.transport.start_network_listeners().await?;
999
1000        // Start the adaptive DHT layer (DHT manager + trust engine)
1001        self.adaptive_dht.start().await?;
1002
1003        // Log current listen addresses
1004        let listen_addrs = self.transport.listen_addrs().await;
1005        info!("P2P node started on addresses: {:?}", listen_addrs);
1006
1007        // NOTE: Message receiving is now integrated into the accept loop in start_network_listeners()
1008        // The old start_message_receiving_system() is no longer needed as it competed with the accept
1009        // loop for incoming connections, causing messages to be lost.
1010
1011        // Connect to bootstrap peers
1012        self.connect_bootstrap_peers().await?;
1013
1014        self.is_started
1015            .store(true, std::sync::atomic::Ordering::Release);
1016
1017        Ok(())
1018    }
1019
1020    // start_network_listeners and start_message_receiving_system
1021    // are now implemented in TransportHandle
1022
1023    /// Run the P2P node (blocks until shutdown)
1024    pub async fn run(&self) -> Result<()> {
1025        if !self.is_running() {
1026            self.start().await?;
1027        }
1028
1029        info!("P2P node running...");
1030
1031        // Block until shutdown is signalled. All background work (connection
1032        // lifecycle, DHT maintenance, EigenTrust) runs in dedicated tasks.
1033        self.shutdown.cancelled().await;
1034
1035        info!("P2P node stopped");
1036        Ok(())
1037    }
1038
1039    /// Stop the P2P node
1040    pub async fn stop(&self) -> Result<()> {
1041        info!("Stopping P2P node...");
1042
1043        // Signal the run loop to exit
1044        self.shutdown.cancel();
1045
1046        // Stop DHT layer first so leave messages can be sent while transport is still active.
1047        self.adaptive_dht.stop().await?;
1048
1049        // Stop the transport layer (shutdown endpoints, join tasks, disconnect peers)
1050        self.transport.stop().await?;
1051
1052        self.is_started
1053            .store(false, std::sync::atomic::Ordering::Release);
1054
1055        info!("P2P node stopped");
1056        Ok(())
1057    }
1058
1059    /// Graceful shutdown alias for tests
1060    pub async fn shutdown(&self) -> Result<()> {
1061        self.stop().await
1062    }
1063
1064    /// Check if the node is running
1065    pub fn is_running(&self) -> bool {
1066        self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
1067    }
1068
1069    /// Get the current listen addresses
1070    pub async fn listen_addrs(&self) -> Vec<MultiAddr> {
1071        self.transport.listen_addrs().await
1072    }
1073
1074    /// Get connected peers
1075    pub async fn connected_peers(&self) -> Vec<PeerId> {
1076        self.transport.connected_peers().await
1077    }
1078
1079    /// Get peer count
1080    pub async fn peer_count(&self) -> usize {
1081        self.transport.peer_count().await
1082    }
1083
1084    /// Get peer info
1085    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1086        self.transport.peer_info(peer_id).await
1087    }
1088
1089    /// Get the channel ID for a given address, if connected (internal only).
1090    #[allow(dead_code)]
1091    pub(crate) async fn get_channel_id_by_address(&self, addr: &MultiAddr) -> Option<String> {
1092        self.transport.get_channel_id_by_address(addr).await
1093    }
1094
1095    /// List all active transport-level connections (internal only).
1096    #[allow(dead_code)]
1097    pub(crate) async fn list_active_connections(&self) -> Vec<(String, Vec<MultiAddr>)> {
1098        self.transport.list_active_connections().await
1099    }
1100
1101    /// Remove a channel from the peers map (internal only).
1102    #[allow(dead_code)]
1103    pub(crate) async fn remove_channel(&self, channel_id: &str) -> bool {
1104        self.transport.remove_channel(channel_id).await
1105    }
1106
1107    /// Close a channel's QUIC connection and remove it from all tracking maps.
1108    ///
1109    /// Use when a transport-level connection was established but identity
1110    /// exchange failed, so no [`PeerId`] is available for [`disconnect_peer`].
1111    pub(crate) async fn disconnect_channel(&self, channel_id: &str) {
1112        self.transport.disconnect_channel(channel_id).await;
1113    }
1114
1115    /// Check if an authenticated peer is connected (has at least one active channel).
1116    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1117        self.transport.is_peer_connected(peer_id).await
1118    }
1119
1120    /// Connect to a peer, returning the transport-level channel ID.
1121    ///
1122    /// The returned channel ID is **not** the app-level [`PeerId`]. To obtain
1123    /// the authenticated peer identity, call
1124    /// [`wait_for_peer_identity`](Self::wait_for_peer_identity) with the
1125    /// returned channel ID.
1126    pub async fn connect_peer(&self, address: &MultiAddr) -> Result<String> {
1127        self.transport.connect_peer(address).await
1128    }
1129
1130    /// Wait for the identity exchange on `channel_id` to complete, returning
1131    /// the authenticated [`PeerId`].
1132    ///
1133    /// Use this after [`connect_peer`](Self::connect_peer) to bridge the gap
1134    /// between the transport-level channel ID and the app-level peer identity
1135    /// required by [`send_message`](Self::send_message).
1136    pub async fn wait_for_peer_identity(
1137        &self,
1138        channel_id: &str,
1139        timeout: Duration,
1140    ) -> Result<PeerId> {
1141        self.transport
1142            .wait_for_peer_identity(channel_id, timeout)
1143            .await
1144    }
1145
1146    /// Disconnect from a peer
1147    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1148        self.transport.disconnect_peer(peer_id).await
1149    }
1150
1151    /// Check if a connection to a peer is active (internal only).
1152    #[allow(dead_code)]
1153    pub(crate) async fn is_connection_active(&self, channel_id: &str) -> bool {
1154        self.transport.is_connection_active(channel_id).await
1155    }
1156
1157    /// Send a message to an authenticated peer, reconnecting on demand.
1158    ///
1159    /// Tries the existing connection first. If the send fails (stale QUIC
1160    /// session, peer not found, etc.), resolves a dial address from:
1161    ///
1162    /// 1. Caller-provided `addrs` (highest priority)
1163    /// 2. Addresses cached in the transport layer (snapshotted before the
1164    ///    send attempt, since stale-channel cleanup removes them)
1165    /// 3. DHT routing table
1166    ///
1167    /// Then dials, waits for identity exchange, and retries the send exactly
1168    /// once on the fresh connection.  Concurrent reconnects to the same peer
1169    /// are serialised so only one dial is attempted at a time.
1170    pub async fn send_message(
1171        &self,
1172        peer_id: &PeerId,
1173        protocol: &str,
1174        data: Vec<u8>,
1175        addrs: &[MultiAddr],
1176    ) -> Result<()> {
1177        // Snapshot channel IDs before the send attempt — transport.send_message
1178        // prunes dead channels from bookkeeping but does NOT close the
1179        // underlying QUIC connection.  We need the original IDs for
1180        // disconnect_channel later.
1181        let existing_channels = self.transport.channels_for_peer(peer_id).await;
1182
1183        // No existing connection — serialise so concurrent sends to the same
1184        // unconnected peer don't each open their own QUIC connection.
1185        if existing_channels.is_empty() {
1186            let lock = self.reconnect_lock_for(peer_id);
1187            let _guard = lock.lock().await;
1188
1189            // Another sender may have connected while we waited for the lock.
1190            if self.transport.is_peer_connected(peer_id).await {
1191                return self.transport.send_message(peer_id, protocol, data).await;
1192            }
1193
1194            return self
1195                .reconnect_and_send(peer_id, protocol, data, addrs, &[], &[])
1196                .await;
1197        }
1198
1199        // Snapshot addresses before the send attempt — transport.send_message
1200        // prunes stale channels, which removes peer_info.
1201        let saved_addrs: Vec<MultiAddr> = self
1202            .transport
1203            .peer_info(peer_id)
1204            .await
1205            .map(|info| info.addresses)
1206            .unwrap_or_default();
1207
1208        // Clone data for retry — transport.send_message consumes the Vec,
1209        // so we need a copy if the first attempt fails.
1210        let retry_data = data.clone();
1211
1212        // Fast path: try existing connection.
1213        match self.transport.send_message(peer_id, protocol, data).await {
1214            Ok(()) => return Ok(()),
1215            Err(e) => {
1216                debug!(
1217                    peer = %peer_id.to_hex(),
1218                    error = %e,
1219                    "send failed, attempting reconnect",
1220                );
1221            }
1222        }
1223
1224        // Serialise reconnect attempts so concurrent sends to the same
1225        // stale peer don't race to dial.
1226        let lock = self.reconnect_lock_for(peer_id);
1227        let _guard = lock.lock().await;
1228
1229        // Another sender may have reconnected while we waited for the lock.
1230        if self.transport.is_peer_connected(peer_id).await {
1231            // Close stale QUIC connections that remove_channel (called inside
1232            // transport.send_message on failure) didn't tear down — it only
1233            // removes bookkeeping, not the underlying QUIC session.
1234            for channel_id in &existing_channels {
1235                self.transport.disconnect_channel(channel_id).await;
1236            }
1237            return self
1238                .transport
1239                .send_message(peer_id, protocol, retry_data)
1240                .await;
1241        }
1242
1243        self.reconnect_and_send(
1244            peer_id,
1245            protocol,
1246            retry_data,
1247            addrs,
1248            &saved_addrs,
1249            &existing_channels,
1250        )
1251        .await
1252    }
1253
1254    /// Tear down stale channels, reconnect to a peer, and send a message.
1255    async fn reconnect_and_send(
1256        &self,
1257        peer_id: &PeerId,
1258        protocol: &str,
1259        data: Vec<u8>,
1260        addrs: &[MultiAddr],
1261        saved_addrs: &[MultiAddr],
1262        stale_channels: &[String],
1263    ) -> Result<()> {
1264        // Resolve a dial address: caller-provided > saved > DHT.
1265        let address = self
1266            .resolve_dial_address(peer_id, addrs, saved_addrs)
1267            .await
1268            .ok_or_else(|| {
1269                P2PError::Network(NetworkError::PeerNotFound(peer_id.to_hex().into()))
1270            })?;
1271
1272        // Tear down stale QUIC connections using their actual channel IDs.
1273        // transport.send_message only removes bookkeeping (peer_to_channel,
1274        // peers, active_connections) — it does NOT close the underlying QUIC
1275        // connection.  We must use the real channel IDs, not the resolved
1276        // dial address, because NAT / port migration can make them differ.
1277        if !stale_channels.is_empty() {
1278            for channel_id in stale_channels {
1279                self.transport.disconnect_channel(channel_id).await;
1280            }
1281            tokio::time::sleep(QUIC_TEARDOWN_GRACE).await;
1282        }
1283
1284        // Dial and wait for identity exchange.
1285        let channel_id = self.transport.connect_peer(&address).await?;
1286        let authenticated = match self
1287            .transport
1288            .wait_for_peer_identity(&channel_id, RECONNECT_IDENTITY_TIMEOUT)
1289            .await
1290        {
1291            Ok(peer) => peer,
1292            Err(e) => {
1293                // Close the freshly-dialed QUIC connection so it doesn't
1294                // linger as a zombie until idle timeout.
1295                self.transport.disconnect_channel(&channel_id).await;
1296                return Err(e);
1297            }
1298        };
1299
1300        if &authenticated != peer_id {
1301            self.transport.disconnect_channel(&channel_id).await;
1302            return Err(P2PError::Identity(IdentityError::IdentityMismatch {
1303                expected: peer_id.to_hex().into(),
1304                actual: authenticated.to_hex().into(),
1305            }));
1306        }
1307
1308        // Send on the fresh connection.
1309        self.transport.send_message(peer_id, protocol, data).await
1310    }
1311
1312    /// Resolve a dial address for `peer_id`, preferring caller-provided
1313    /// addresses over cached/DHT sources.
1314    ///
1315    /// Returns the first dialable (QUIC, non-unspecified) address found, or
1316    /// `None` when no address is available.
1317    async fn resolve_dial_address(
1318        &self,
1319        peer_id: &PeerId,
1320        caller_addrs: &[MultiAddr],
1321        saved_addrs: &[MultiAddr],
1322    ) -> Option<MultiAddr> {
1323        // 1. Caller-provided addresses (highest priority).
1324        if let Some(addr) = Self::first_dialable(caller_addrs) {
1325            return Some(addr);
1326        }
1327
1328        // 2. Addresses snapshotted from the transport layer before the send
1329        //    attempt cleaned them up.
1330        if let Some(addr) = Self::first_dialable(saved_addrs) {
1331            return Some(addr);
1332        }
1333
1334        // 3. DHT routing table — apply the same dialability filter.
1335        let dht_addrs = self.adaptive_dht.peer_addresses_for_dial(peer_id).await;
1336        Self::first_dialable(&dht_addrs)
1337    }
1338
1339    /// Return the first dialable QUIC address from a slice, skipping
1340    /// non-QUIC and unspecified (`0.0.0.0` / `::`) addresses.
1341    fn first_dialable(addrs: &[MultiAddr]) -> Option<MultiAddr> {
1342        addrs
1343            .iter()
1344            .find(|a| {
1345                let dialable = a
1346                    .dialable_socket_addr()
1347                    .is_some_and(|sa| !sa.ip().is_unspecified());
1348                if !dialable {
1349                    trace!(address = %a, "skipping non-dialable address");
1350                }
1351                dialable
1352            })
1353            .cloned()
1354    }
1355
1356    /// Get or create a per-peer reconnect lock.
1357    fn reconnect_lock_for(&self, peer_id: &PeerId) -> Arc<TokioMutex<()>> {
1358        self.reconnect_locks
1359            .lock()
1360            .entry(*peer_id)
1361            .or_insert_with(|| Arc::new(TokioMutex::new(())))
1362            .clone()
1363    }
1364}
1365
1366/// Parse a postcard-encoded protocol message into a `P2PEvent::Message`.
1367///
1368/// Returns `None` if the bytes cannot be deserialized as a valid `WireMessage`.
1369///
1370/// The `from` field is a required part of the wire protocol but is **not**
1371/// used as the event source. Instead, `source` — the transport-level peer ID
1372/// derived from the authenticated QUIC connection — is used so that consumers
1373/// can pass it directly to `send_message()`. This eliminates a spoofing
1374/// vector where a peer could claim an arbitrary identity via the payload.
1375///
1376/// Maximum allowed clock skew for message timestamps (5 minutes).
1377/// This is intentionally lenient for initial deployment to accommodate nodes with
1378/// misconfigured clocks or high-latency network conditions. Can be tightened (e.g., to 60s)
1379/// once the network stabilizes and node clock synchronization improves.
1380const MAX_MESSAGE_AGE_SECS: u64 = 300;
1381/// Maximum allowed future timestamp (30 seconds to account for clock drift)
1382const MAX_FUTURE_SECS: u64 = 30;
1383
1384/// Convenience constructor for `P2PError::Network(NetworkError::ProtocolError(...))`.
1385fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
1386    P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
1387}
1388
1389/// Helper to send an event via a broadcast sender, logging at trace level if no receivers.
1390pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
1391    if let Err(e) = tx.send(event) {
1392        tracing::trace!("Event broadcast has no receivers: {e}");
1393    }
1394}
1395
1396/// Result of parsing a protocol message, including optional authenticated identity.
1397pub(crate) struct ParsedMessage {
1398    /// The P2P event to broadcast.
1399    pub(crate) event: P2PEvent,
1400    /// If the message was signed and verified, the authenticated app-level [`PeerId`].
1401    pub(crate) authenticated_node_id: Option<PeerId>,
1402    /// The sender's user agent string from the wire message.
1403    pub(crate) user_agent: String,
1404}
1405
1406pub(crate) fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<ParsedMessage> {
1407    let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1408
1409    // Validate timestamp to prevent replay attacks
1410    let now = std::time::SystemTime::now()
1411        .duration_since(std::time::UNIX_EPOCH)
1412        .map(|d| d.as_secs())
1413        .unwrap_or(0);
1414
1415    // Reject messages that are too old (potential replay)
1416    if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
1417        tracing::warn!(
1418            "Rejecting stale message from {} (timestamp {} is {} seconds old)",
1419            source,
1420            message.timestamp,
1421            now.saturating_sub(message.timestamp)
1422        );
1423        return None;
1424    }
1425
1426    // Reject messages too far in the future (clock manipulation)
1427    if message.timestamp > now + MAX_FUTURE_SECS {
1428        tracing::warn!(
1429            "Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
1430            source,
1431            message.timestamp,
1432            message.timestamp.saturating_sub(now)
1433        );
1434        return None;
1435    }
1436
1437    // Verify app-level signature if present
1438    let authenticated_node_id = if !message.signature.is_empty() {
1439        match verify_message_signature(&message) {
1440            Ok(peer_id) => {
1441                debug!(
1442                    "Message from {} authenticated as app-level NodeId {}",
1443                    source, peer_id
1444                );
1445                Some(peer_id)
1446            }
1447            Err(e) => {
1448                warn!(
1449                    "Rejecting message from {}: signature verification failed: {}",
1450                    source, e
1451                );
1452                return None;
1453            }
1454        }
1455    } else {
1456        None
1457    };
1458
1459    debug!(
1460        "Parsed P2PEvent::Message - topic: {}, source: {:?} (transport: {}, logical: {}), payload_len: {}",
1461        message.protocol,
1462        authenticated_node_id,
1463        source,
1464        message.from,
1465        message.data.len()
1466    );
1467
1468    Some(ParsedMessage {
1469        event: P2PEvent::Message {
1470            topic: message.protocol,
1471            source: authenticated_node_id,
1472            data: message.data,
1473        },
1474        authenticated_node_id,
1475        user_agent: message.user_agent,
1476    })
1477}
1478
1479/// Verify the ML-DSA-65 signature on a WireMessage and return the authenticated [`PeerId`].
1480///
1481/// Besides verifying the cryptographic signature, this also checks that the
1482/// self-asserted `from` field matches the [`PeerId`] derived from the public
1483/// key. This prevents a sender from signing with their real key while
1484/// claiming a different identity in the `from` field.
1485fn verify_message_signature(message: &WireMessage) -> std::result::Result<PeerId, String> {
1486    let pubkey = MlDsaPublicKey::from_bytes(&message.public_key)
1487        .map_err(|e| format!("invalid public key: {e:?}"))?;
1488
1489    let peer_id = peer_id_from_public_key(&pubkey);
1490
1491    // Validate that the self-asserted `from` field matches the public key.
1492    if message.from != peer_id {
1493        return Err(format!(
1494            "from field mismatch: message claims '{}' but public key derives '{}'",
1495            message.from, peer_id
1496        ));
1497    }
1498
1499    let signable = postcard::to_stdvec(&(
1500        &message.protocol,
1501        &message.data as &[u8],
1502        &message.from,
1503        message.timestamp,
1504        &message.user_agent,
1505    ))
1506    .map_err(|e| format!("failed to serialize signable bytes: {e}"))?;
1507
1508    let sig = MlDsaSignature::from_bytes(&message.signature)
1509        .map_err(|e| format!("invalid signature: {e:?}"))?;
1510
1511    let valid = crate::quantum_crypto::ml_dsa_verify(&pubkey, &signable, &sig)
1512        .map_err(|e| format!("verification error: {e}"))?;
1513
1514    if valid {
1515        Ok(peer_id)
1516    } else {
1517        Err("signature is invalid".to_string())
1518    }
1519}
1520
1521impl P2PNode {
1522    /// Subscribe to network events
1523    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1524        self.transport.subscribe_events()
1525    }
1526
1527    /// Backwards-compat event stream accessor for tests
1528    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1529        self.subscribe_events()
1530    }
1531
1532    /// Get node uptime
1533    pub fn uptime(&self) -> Duration {
1534        self.start_time.elapsed()
1535    }
1536
1537    // MCP removed: all MCP tool/service methods removed
1538
1539    // /// Handle MCP remote tool call with network integration
1540
1541    // /// List tools available on a specific remote peer
1542
1543    // /// Get MCP server statistics
1544
1545    // Background tasks (connection_lifecycle_monitor, keepalive, periodic_maintenance)
1546    // are now implemented in TransportHandle.
1547
1548    /// Check system health
1549    pub async fn health_check(&self) -> Result<()> {
1550        let peer_count = self.peer_count().await;
1551        if peer_count > self.config.max_connections {
1552            Err(protocol_error(format!(
1553                "Too many connections: {peer_count}"
1554            )))
1555        } else {
1556            Ok(())
1557        }
1558    }
1559
1560    /// Get the attached DHT manager.
1561    pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
1562        self.adaptive_dht.dht_manager()
1563    }
1564
1565    /// Backwards-compatible alias for `dht_manager()`.
1566    pub fn dht(&self) -> &Arc<DhtNetworkManager> {
1567        self.dht_manager()
1568    }
1569
1570    /// Add a discovered peer to the bootstrap cache
1571    pub async fn add_discovered_peer(
1572        &self,
1573        _peer_id: PeerId,
1574        addresses: Vec<MultiAddr>,
1575    ) -> Result<()> {
1576        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1577            let manager = bootstrap_manager.read().await;
1578            let socket_addresses: Vec<std::net::SocketAddr> = addresses
1579                .iter()
1580                .filter_map(|addr| addr.socket_addr())
1581                .collect();
1582            if let Some(&primary) = socket_addresses.first() {
1583                manager
1584                    .add_peer(&primary, socket_addresses)
1585                    .await
1586                    .map_err(|e| {
1587                        protocol_error(format!("Failed to add peer to bootstrap cache: {e}"))
1588                    })?;
1589            }
1590        }
1591        Ok(())
1592    }
1593
1594    /// Update connection metrics for a peer in the bootstrap cache
1595    pub async fn update_peer_metrics(
1596        &self,
1597        addr: &MultiAddr,
1598        success: bool,
1599        latency_ms: Option<u64>,
1600        _error: Option<String>,
1601    ) -> Result<()> {
1602        if let Some(ref bootstrap_manager) = self.bootstrap_manager
1603            && let Some(sa) = addr.socket_addr()
1604        {
1605            let manager = bootstrap_manager.read().await;
1606            if success {
1607                let rtt_ms = latency_ms.unwrap_or(0) as u32;
1608                manager.record_success(&sa, rtt_ms).await;
1609            } else {
1610                manager.record_failure(&sa).await;
1611            }
1612        }
1613        Ok(())
1614    }
1615
1616    /// Get bootstrap cache statistics
1617    pub async fn get_bootstrap_cache_stats(
1618        &self,
1619    ) -> Result<Option<crate::bootstrap::BootstrapStats>> {
1620        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1621            let manager = bootstrap_manager.read().await;
1622            Ok(Some(manager.stats().await))
1623        } else {
1624            Ok(None)
1625        }
1626    }
1627
1628    /// Get the number of cached bootstrap peers
1629    pub async fn cached_peer_count(&self) -> usize {
1630        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1631            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1632        {
1633            return stats.total_peers;
1634        }
1635        0
1636    }
1637
1638    /// Connect to bootstrap peers and perform initial peer discovery
1639    async fn connect_bootstrap_peers(&self) -> Result<()> {
1640        // Each entry is a list of addresses for a single peer.
1641        let mut bootstrap_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
1642        let mut used_cache = false;
1643        let mut seen_addresses = std::collections::HashSet::new();
1644
1645        // Configured bootstrap peers take priority -- always include them first.
1646        if !self.config.bootstrap_peers.is_empty() {
1647            info!(
1648                "Using {} configured bootstrap peers (priority)",
1649                self.config.bootstrap_peers.len()
1650            );
1651            for multiaddr in &self.config.bootstrap_peers {
1652                let Some(socket_addr) = multiaddr.dialable_socket_addr() else {
1653                    warn!("Skipping non-QUIC bootstrap peer: {}", multiaddr);
1654                    continue;
1655                };
1656                seen_addresses.insert(socket_addr);
1657                bootstrap_addr_sets.push(vec![multiaddr.clone()]);
1658            }
1659        }
1660
1661        // Supplement with cached bootstrap peers (after CLI peers)
1662        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1663            let manager = bootstrap_manager.read().await;
1664            let cached_peers = manager.select_peers(BOOTSTRAP_PEER_BATCH_SIZE).await;
1665            if !cached_peers.is_empty() {
1666                let mut added_from_cache = 0;
1667                for cached in cached_peers {
1668                    let mut addrs = vec![cached.primary_address];
1669                    addrs.extend(cached.addresses);
1670                    // Only add addresses we haven't seen from CLI peers
1671                    let new_addresses: Vec<MultiAddr> = addrs
1672                        .into_iter()
1673                        .filter(|a| !seen_addresses.contains(a))
1674                        .map(MultiAddr::quic)
1675                        .collect();
1676
1677                    if !new_addresses.is_empty() {
1678                        for addr in &new_addresses {
1679                            if let Some(sa) = addr.socket_addr() {
1680                                seen_addresses.insert(sa);
1681                            }
1682                        }
1683                        bootstrap_addr_sets.push(new_addresses);
1684                        added_from_cache += 1;
1685                    }
1686                }
1687                if added_from_cache > 0 {
1688                    info!(
1689                        "Added {} cached bootstrap peers (supplementing CLI peers)",
1690                        added_from_cache
1691                    );
1692                    used_cache = true;
1693                }
1694            }
1695        }
1696
1697        if bootstrap_addr_sets.is_empty() {
1698            info!("No bootstrap peers configured and no cached peers available");
1699            return Ok(());
1700        }
1701
1702        // Connect to bootstrap peers, wait for identity exchange, then
1703        // perform DHT peer discovery using the real cryptographic PeerIds.
1704        let identity_timeout = Duration::from_secs(BOOTSTRAP_IDENTITY_TIMEOUT_SECS);
1705        let mut successful_connections = 0;
1706        let mut connected_peer_ids: Vec<PeerId> = Vec::new();
1707
1708        for addrs in &bootstrap_addr_sets {
1709            for addr in addrs {
1710                match self.connect_peer(addr).await {
1711                    Ok(channel_id) => {
1712                        // Wait for the remote peer's signed identity announce
1713                        // so we get a real cryptographic PeerId.
1714                        match self
1715                            .transport
1716                            .wait_for_peer_identity(&channel_id, identity_timeout)
1717                            .await
1718                        {
1719                            Ok(real_peer_id) => {
1720                                successful_connections += 1;
1721                                connected_peer_ids.push(real_peer_id);
1722
1723                                // Update bootstrap cache with successful connection
1724                                if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1725                                    let manager = bootstrap_manager.read().await;
1726                                    if let Some(sa) = addr.socket_addr() {
1727                                        manager.record_success(&sa, 100).await;
1728                                    }
1729                                }
1730                                break; // Successfully connected, move to next peer
1731                            }
1732                            Err(e) => {
1733                                warn!(
1734                                    "Timeout waiting for identity from bootstrap peer {}: {}, \
1735                                     closing channel {}",
1736                                    addr, e, channel_id
1737                                );
1738                                self.disconnect_channel(&channel_id).await;
1739                            }
1740                        }
1741                    }
1742                    Err(e) => {
1743                        warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1744
1745                        // Update bootstrap cache with failed connection
1746                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
1747                            let manager = bootstrap_manager.read().await;
1748                            if let Some(sa) = addr.socket_addr() {
1749                                manager.record_failure(&sa).await;
1750                            }
1751                        }
1752                    }
1753                }
1754            }
1755        }
1756
1757        if successful_connections == 0 {
1758            if !used_cache {
1759                warn!("Failed to connect to any bootstrap peers");
1760            }
1761            // Starting a node should not be gated on immediate bootstrap connectivity.
1762            // Keep running and allow background discovery / retries to populate peers later.
1763            return Ok(());
1764        }
1765
1766        info!(
1767            "Successfully connected to {} bootstrap peers",
1768            successful_connections
1769        );
1770
1771        // Perform DHT peer discovery from connected bootstrap peers.
1772        match self
1773            .dht_manager()
1774            .bootstrap_from_peers(&connected_peer_ids)
1775            .await
1776        {
1777            Ok(count) => info!("DHT peer discovery found {} peers", count),
1778            Err(e) => warn!("DHT peer discovery failed: {}", e),
1779        }
1780
1781        // Mark node as bootstrapped - we have connected to bootstrap peers
1782        // and initiated peer discovery
1783        self.is_bootstrapped.store(true, Ordering::SeqCst);
1784        info!(
1785            "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
1786            successful_connections,
1787            connected_peer_ids.len()
1788        );
1789
1790        Ok(())
1791    }
1792
1793    // disconnect_all_peers and periodic_tasks are now in TransportHandle
1794}
1795
1796/// Network sender trait for sending messages
1797#[async_trait::async_trait]
1798#[allow(dead_code)]
1799pub trait NetworkSender: Send + Sync {
1800    /// Send a message to an authenticated peer.
1801    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
1802
1803    /// Get our local peer ID (cryptographic identity).
1804    fn local_peer_id(&self) -> PeerId;
1805}
1806
1807// P2PNetworkSender removed — NetworkSender is now implemented directly on TransportHandle.
1808// NodeBuilder removed — use NodeConfigBuilder + P2PNode::new() instead.
1809
1810#[cfg(test)]
1811#[allow(clippy::unwrap_used, clippy::expect_used)]
1812mod diversity_tests {
1813    use super::*;
1814    use crate::security::IPDiversityConfig;
1815
1816    async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
1817        // Use a temp dir to avoid conflicts with cached files from old format
1818        let temp_dir = tempfile::TempDir::new().expect("temp dir");
1819        let mut bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
1820        bootstrap_config.cache_dir = temp_dir.path().to_path_buf();
1821
1822        BootstrapManager::with_node_config(bootstrap_config, config)
1823            .await
1824            .expect("bootstrap manager")
1825    }
1826
1827    #[tokio::test]
1828    async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
1829        let config = NodeConfig {
1830            diversity_config: Some(IPDiversityConfig::testnet()),
1831            ..Default::default()
1832        };
1833
1834        let manager = build_bootstrap_manager_like_prod(&config).await;
1835        assert!(manager.diversity_config().is_relaxed());
1836        assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
1837    }
1838}
1839
1840/// Helper function to register a new channel
1841pub(crate) async fn register_new_channel(
1842    peers: &Arc<RwLock<HashMap<String, PeerInfo>>>,
1843    channel_id: &str,
1844    remote_addr: &MultiAddr,
1845) {
1846    let mut peers_guard = peers.write().await;
1847    let peer_info = PeerInfo {
1848        channel_id: channel_id.to_owned(),
1849        addresses: vec![remote_addr.clone()],
1850        connected_at: tokio::time::Instant::now(),
1851        last_seen: tokio::time::Instant::now(),
1852        status: ConnectionStatus::Connected,
1853        protocols: vec!["p2p-core/1.0.0".to_string()],
1854        heartbeat_count: 0,
1855    };
1856    peers_guard.insert(channel_id.to_owned(), peer_info);
1857}
1858
1859#[cfg(test)]
1860mod tests {
1861    use super::*;
1862    // MCP removed from tests
1863    use std::time::Duration;
1864    use tokio::time::timeout;
1865
1866    /// 2 MiB — used in builder tests to verify max_message_size configuration.
1867    const TEST_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
1868
1869    // Test tool handler for network tests
1870
1871    // MCP removed
1872
1873    /// Helper function to create a test node configuration
1874    fn create_test_node_config() -> NodeConfig {
1875        NodeConfig {
1876            local: true,
1877            port: 0,
1878            ipv6: true,
1879            bootstrap_peers: vec![],
1880            connection_timeout: Duration::from_secs(2),
1881            max_connections: 100,
1882            dht_config: DHTConfig::default(),
1883            bootstrap_cache_config: None,
1884            diversity_config: None,
1885            max_message_size: None,
1886            node_identity: None,
1887            mode: NodeMode::default(),
1888            custom_user_agent: None,
1889            allow_loopback: true,
1890            adaptive_dht_config: AdaptiveDhtConfig::default(),
1891        }
1892    }
1893
1894    /// Helper function to create a test tool
1895    // MCP removed: test tool helper deleted
1896
1897    #[tokio::test]
1898    async fn test_node_config_default() {
1899        let config = NodeConfig::default();
1900
1901        assert_eq!(config.listen_addrs().len(), 2); // IPv4 + IPv6
1902        assert_eq!(config.max_connections, 10000);
1903        assert_eq!(config.connection_timeout, Duration::from_secs(30));
1904    }
1905
1906    #[tokio::test]
1907    async fn test_dht_config_default() {
1908        let config = DHTConfig::default();
1909
1910        assert_eq!(config.k_value, 20);
1911        assert_eq!(config.alpha_value, 5);
1912        assert_eq!(config.refresh_interval, Duration::from_secs(600));
1913    }
1914
1915    #[test]
1916    fn test_connection_status_variants() {
1917        let connecting = ConnectionStatus::Connecting;
1918        let connected = ConnectionStatus::Connected;
1919        let disconnecting = ConnectionStatus::Disconnecting;
1920        let disconnected = ConnectionStatus::Disconnected;
1921        let failed = ConnectionStatus::Failed("test error".to_string());
1922
1923        assert_eq!(connecting, ConnectionStatus::Connecting);
1924        assert_eq!(connected, ConnectionStatus::Connected);
1925        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
1926        assert_eq!(disconnected, ConnectionStatus::Disconnected);
1927        assert_ne!(connecting, connected);
1928
1929        if let ConnectionStatus::Failed(msg) = failed {
1930            assert_eq!(msg, "test error");
1931        } else {
1932            panic!("Expected Failed status");
1933        }
1934    }
1935
1936    #[tokio::test]
1937    async fn test_node_creation() -> Result<()> {
1938        let config = create_test_node_config();
1939        let node = P2PNode::new(config).await?;
1940
1941        // PeerId is derived from the cryptographic identity (32-byte BLAKE3 hash)
1942        assert_eq!(node.peer_id().to_hex().len(), 64);
1943        assert!(!node.is_running());
1944        assert_eq!(node.peer_count().await, 0);
1945        assert!(node.connected_peers().await.is_empty());
1946
1947        Ok(())
1948    }
1949
1950    #[tokio::test]
1951    async fn test_node_lifecycle() -> Result<()> {
1952        let config = create_test_node_config();
1953        let node = P2PNode::new(config).await?;
1954
1955        // Initially not running
1956        assert!(!node.is_running());
1957
1958        // Start the node
1959        node.start().await?;
1960        assert!(node.is_running());
1961
1962        // Check listen addresses were set (at least one)
1963        let listen_addrs = node.listen_addrs().await;
1964        assert!(
1965            !listen_addrs.is_empty(),
1966            "Expected at least one listening address"
1967        );
1968
1969        // Stop the node
1970        node.stop().await?;
1971        assert!(!node.is_running());
1972
1973        Ok(())
1974    }
1975
1976    #[tokio::test]
1977    async fn test_peer_connection() -> Result<()> {
1978        let config1 = create_test_node_config();
1979        let config2 = create_test_node_config();
1980
1981        let node1 = P2PNode::new(config1).await?;
1982        let node2 = P2PNode::new(config2).await?;
1983
1984        node1.start().await?;
1985        node2.start().await?;
1986
1987        let node2_addr = node2
1988            .listen_addrs()
1989            .await
1990            .into_iter()
1991            .find(|a| a.is_ipv4())
1992            .ok_or_else(|| {
1993                P2PError::Network(crate::error::NetworkError::InvalidAddress(
1994                    "Node 2 did not expose an IPv4 listen address".into(),
1995                ))
1996            })?;
1997
1998        // Connect to a real peer (unsigned — no node_identity configured).
1999        // connect_peer returns a transport-level channel ID (String), not a PeerId.
2000        let channel_id = node1.connect_peer(&node2_addr).await?;
2001
2002        // Unauthenticated connections don't appear in the app-level peer maps.
2003        // Verify transport-level tracking via is_connection_active / peers map.
2004        assert!(node1.is_connection_active(&channel_id).await);
2005
2006        // Get peer info from the transport-level peers map (keyed by channel ID)
2007        let peer_info = node1.transport.peer_info_by_channel(&channel_id).await;
2008        assert!(peer_info.is_some());
2009        let info = peer_info.expect("Peer info should exist after connect");
2010        assert_eq!(info.channel_id, channel_id);
2011        assert_eq!(info.status, ConnectionStatus::Connected);
2012        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2013
2014        // Disconnect the channel
2015        node1.remove_channel(&channel_id).await;
2016        assert!(!node1.is_connection_active(&channel_id).await);
2017
2018        node1.stop().await?;
2019        node2.stop().await?;
2020
2021        Ok(())
2022    }
2023
2024    #[tokio::test]
2025    async fn test_connect_peer_rejects_tcp_multiaddr() -> Result<()> {
2026        let config = create_test_node_config();
2027        let node = P2PNode::new(config).await?;
2028
2029        let tcp_addr: MultiAddr = "/ip4/127.0.0.1/tcp/1".parse().unwrap();
2030        let result = node.connect_peer(&tcp_addr).await;
2031
2032        assert!(
2033            matches!(
2034                result,
2035                Err(P2PError::Network(
2036                    crate::error::NetworkError::InvalidAddress(_)
2037                ))
2038            ),
2039            "TCP multiaddrs should be rejected before a QUIC dial is attempted, got: {:?}",
2040            result
2041        );
2042
2043        Ok(())
2044    }
2045
2046    // TODO(windows): Investigate QUIC connection issues on Windows CI
2047    // This test consistently fails on Windows GitHub Actions runners with
2048    // "All connect attempts failed" even with IPv4-only config, long delays,
2049    // and multiple retry attempts. The underlying saorsa-transport library may have
2050    // issues on Windows that need investigation.
2051    // See: https://github.com/dirvine/saorsa-core/issues/TBD
2052    #[cfg_attr(target_os = "windows", ignore)]
2053    #[tokio::test]
2054    async fn test_event_subscription() -> Result<()> {
2055        // PeerConnected/PeerDisconnected only fire for authenticated peers
2056        // (nodes with node_identity that send signed messages).
2057        // Configure both nodes with identities so the event subscription test works.
2058        let identity1 =
2059            Arc::new(NodeIdentity::generate().expect("should generate identity for test node1"));
2060        let identity2 =
2061            Arc::new(NodeIdentity::generate().expect("should generate identity for test node2"));
2062
2063        let mut config1 = create_test_node_config();
2064        config1.ipv6 = false;
2065        config1.node_identity = Some(identity1);
2066
2067        let node2_peer_id = *identity2.peer_id();
2068        let mut config2 = create_test_node_config();
2069        config2.ipv6 = false;
2070        config2.node_identity = Some(identity2);
2071
2072        let node1 = P2PNode::new(config1).await?;
2073        let node2 = P2PNode::new(config2).await?;
2074
2075        node1.start().await?;
2076        node2.start().await?;
2077
2078        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
2079
2080        // Subscribe to node2's events (node2 will receive the signed message)
2081        let mut events = node2.subscribe_events();
2082
2083        let node2_addr = node2.local_addr().ok_or_else(|| {
2084            P2PError::Network(crate::error::NetworkError::ProtocolError(
2085                "No listening address".to_string().into(),
2086            ))
2087        })?;
2088
2089        // Connect node1 → node2
2090        let mut channel_id = None;
2091        for attempt in 0..3 {
2092            if attempt > 0 {
2093                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2094            }
2095            match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
2096                Ok(Ok(id)) => {
2097                    channel_id = Some(id);
2098                    break;
2099                }
2100                Ok(Err(_)) | Err(_) => continue,
2101            }
2102        }
2103        let channel_id = channel_id.expect("Failed to connect after 3 attempts");
2104
2105        // Wait for identity exchange to complete via wait_for_peer_identity.
2106        let target_peer_id = node1
2107            .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2108            .await?;
2109        assert_eq!(target_peer_id, node2_peer_id);
2110
2111        // node1 sends a signed message → node2 authenticates → PeerConnected fires on node2
2112        node1
2113            .send_message(&target_peer_id, "test-topic", b"hello".to_vec(), &[])
2114            .await?;
2115
2116        // Check for PeerConnected event on node2
2117        let event = timeout(Duration::from_secs(2), async {
2118            loop {
2119                match events.recv().await {
2120                    Ok(P2PEvent::PeerConnected(id, _)) => return Ok(id),
2121                    Ok(P2PEvent::Message { .. }) => continue, // skip messages
2122                    Ok(_) => continue,
2123                    Err(e) => return Err(e),
2124                }
2125            }
2126        })
2127        .await;
2128        assert!(event.is_ok(), "Should receive PeerConnected event");
2129        let connected_peer_id = event.expect("Timed out").expect("Channel error");
2130        // The connected peer ID should be node1's app-level ID (a valid PeerId)
2131        assert!(
2132            connected_peer_id.0.iter().any(|&b| b != 0),
2133            "PeerConnected should carry a non-zero peer ID"
2134        );
2135
2136        node1.stop().await?;
2137        node2.stop().await?;
2138
2139        Ok(())
2140    }
2141
2142    // TODO(windows): Same QUIC connection issues as test_event_subscription
2143    #[cfg_attr(target_os = "windows", ignore)]
2144    #[tokio::test]
2145    async fn test_message_sending() -> Result<()> {
2146        // Create two nodes (IPv4-only loopback)
2147        let mut config1 = create_test_node_config();
2148        config1.ipv6 = false;
2149        let node1 = P2PNode::new(config1).await?;
2150        node1.start().await?;
2151
2152        let mut config2 = create_test_node_config();
2153        config2.ipv6 = false;
2154        let node2 = P2PNode::new(config2).await?;
2155        node2.start().await?;
2156
2157        // Wait a bit for nodes to start listening
2158        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2159
2160        // Get actual listening address of node2
2161        let node2_addr = node2.local_addr().ok_or_else(|| {
2162            P2PError::Network(crate::error::NetworkError::ProtocolError(
2163                "No listening address".to_string().into(),
2164            ))
2165        })?;
2166
2167        // Connect node1 to node2
2168        let channel_id =
2169            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2170                Ok(res) => res?,
2171                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2172            };
2173
2174        // Wait for identity exchange via wait_for_peer_identity.
2175        let target_peer_id = node1
2176            .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2177            .await?;
2178        assert_eq!(target_peer_id, node2.peer_id().clone());
2179
2180        // Send a message
2181        let message_data = b"Hello, peer!".to_vec();
2182        let result = match timeout(
2183            Duration::from_millis(500),
2184            node1.send_message(&target_peer_id, "test-protocol", message_data, &[]),
2185        )
2186        .await
2187        {
2188            Ok(res) => res,
2189            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2190        };
2191        // For now, we'll just check that we don't get a "not connected" error
2192        // The actual send might fail due to no handler on the other side
2193        if let Err(e) = &result {
2194            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2195        }
2196
2197        // Try to send to non-existent peer
2198        let non_existent_peer = PeerId::from_bytes([0xFFu8; 32]);
2199        let result = node1
2200            .send_message(&non_existent_peer, "test-protocol", vec![], &[])
2201            .await;
2202        assert!(result.is_err(), "Sending to non-existent peer should fail");
2203
2204        node1.stop().await?;
2205        node2.stop().await?;
2206
2207        Ok(())
2208    }
2209
2210    #[tokio::test]
2211    async fn test_remote_mcp_operations() -> Result<()> {
2212        let config = create_test_node_config();
2213        let node = P2PNode::new(config).await?;
2214
2215        // MCP removed; test reduced to simple start/stop
2216        node.start().await?;
2217        node.stop().await?;
2218        Ok(())
2219    }
2220
2221    #[tokio::test]
2222    async fn test_health_check() -> Result<()> {
2223        let config = create_test_node_config();
2224        let node = P2PNode::new(config).await?;
2225
2226        // Health check should pass with no connections
2227        let result = node.health_check().await;
2228        assert!(result.is_ok());
2229
2230        // Note: We're not actually connecting to real peers here
2231        // since that would require running bootstrap nodes.
2232        // The health check should still pass with no connections.
2233
2234        Ok(())
2235    }
2236
2237    #[tokio::test]
2238    async fn test_node_uptime() -> Result<()> {
2239        let config = create_test_node_config();
2240        let node = P2PNode::new(config).await?;
2241
2242        let uptime1 = node.uptime();
2243        assert!(uptime1 >= Duration::from_secs(0));
2244
2245        // Wait a bit
2246        tokio::time::sleep(Duration::from_millis(10)).await;
2247
2248        let uptime2 = node.uptime();
2249        assert!(uptime2 > uptime1);
2250
2251        Ok(())
2252    }
2253
2254    #[tokio::test]
2255    async fn test_node_config_access() -> Result<()> {
2256        let config = create_test_node_config();
2257        let node = P2PNode::new(config).await?;
2258
2259        let node_config = node.config();
2260        assert_eq!(node_config.max_connections, 100);
2261        // MCP removed
2262
2263        Ok(())
2264    }
2265
2266    #[tokio::test]
2267    async fn test_mcp_server_access() -> Result<()> {
2268        let config = create_test_node_config();
2269        let _node = P2PNode::new(config).await?;
2270
2271        // MCP removed
2272        Ok(())
2273    }
2274
2275    #[tokio::test]
2276    async fn test_dht_access() -> Result<()> {
2277        let config = create_test_node_config();
2278        let node = P2PNode::new(config).await?;
2279
2280        // DHT is always available
2281        let _dht = node.dht();
2282
2283        Ok(())
2284    }
2285
2286    #[tokio::test]
2287    async fn test_node_config_builder() -> Result<()> {
2288        let bootstrap: MultiAddr = "/ip4/127.0.0.1/udp/9000/quic".parse().unwrap();
2289
2290        let config = NodeConfig::builder()
2291            .local(true)
2292            .ipv6(true)
2293            .bootstrap_peer(bootstrap)
2294            .connection_timeout(Duration::from_secs(15))
2295            .max_connections(200)
2296            .max_message_size(TEST_MAX_MESSAGE_SIZE)
2297            .build()?;
2298
2299        assert_eq!(config.listen_addrs().len(), 2); // IPv4 + IPv6
2300        assert!(config.local);
2301        assert!(config.ipv6);
2302        assert_eq!(config.bootstrap_peers.len(), 1);
2303        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2304        assert_eq!(config.max_connections, 200);
2305        assert_eq!(config.max_message_size, Some(TEST_MAX_MESSAGE_SIZE));
2306        assert!(config.allow_loopback); // auto-enabled by local(true)
2307
2308        Ok(())
2309    }
2310
2311    #[tokio::test]
2312    async fn test_bootstrap_peers() -> Result<()> {
2313        let mut config = create_test_node_config();
2314        config.bootstrap_peers = vec![
2315            crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9200),
2316            crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9201),
2317        ];
2318
2319        let node = P2PNode::new(config).await?;
2320
2321        // Start node (which attempts to connect to bootstrap peers)
2322        node.start().await?;
2323
2324        // In a test environment, bootstrap peers may not be available
2325        // The test verifies the node starts correctly with bootstrap configuration
2326        // Peer count may include local/internal tracking, so we just verify it's reasonable
2327        let _peer_count = node.peer_count().await;
2328
2329        node.stop().await?;
2330        Ok(())
2331    }
2332
2333    #[tokio::test]
2334    async fn test_peer_info_structure() {
2335        let peer_info = PeerInfo {
2336            channel_id: "test_peer".to_string(),
2337            addresses: vec!["/ip4/127.0.0.1/tcp/9000".parse::<MultiAddr>().unwrap()],
2338            connected_at: Instant::now(),
2339            last_seen: Instant::now(),
2340            status: ConnectionStatus::Connected,
2341            protocols: vec!["test-protocol".to_string()],
2342            heartbeat_count: 0,
2343        };
2344
2345        assert_eq!(peer_info.channel_id, "test_peer");
2346        assert_eq!(peer_info.addresses.len(), 1);
2347        assert_eq!(peer_info.status, ConnectionStatus::Connected);
2348        assert_eq!(peer_info.protocols.len(), 1);
2349    }
2350
2351    #[tokio::test]
2352    async fn test_serialization() -> Result<()> {
2353        // Test that configs can be serialized/deserialized
2354        let config = create_test_node_config();
2355        let serialized = serde_json::to_string(&config)?;
2356        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2357
2358        assert_eq!(config.local, deserialized.local);
2359        assert_eq!(config.port, deserialized.port);
2360        assert_eq!(config.ipv6, deserialized.ipv6);
2361        assert_eq!(config.bootstrap_peers, deserialized.bootstrap_peers);
2362
2363        Ok(())
2364    }
2365
2366    #[tokio::test]
2367    async fn test_get_channel_id_by_address_found() -> Result<()> {
2368        let config = create_test_node_config();
2369        let node = P2PNode::new(config).await?;
2370
2371        // Manually insert a peer for testing
2372        let test_channel_id = "peer_test_123".to_string();
2373        let test_address = "192.168.1.100:9000";
2374        let test_multiaddr = MultiAddr::quic(test_address.parse().unwrap());
2375
2376        let peer_info = PeerInfo {
2377            channel_id: test_channel_id.clone(),
2378            addresses: vec![test_multiaddr],
2379            connected_at: Instant::now(),
2380            last_seen: Instant::now(),
2381            status: ConnectionStatus::Connected,
2382            protocols: vec!["test-protocol".to_string()],
2383            heartbeat_count: 0,
2384        };
2385
2386        node.transport
2387            .inject_peer(test_channel_id.clone(), peer_info)
2388            .await;
2389
2390        // Test: Find channel by address
2391        let lookup_addr = MultiAddr::quic(test_address.parse().unwrap());
2392        let found_channel_id = node.get_channel_id_by_address(&lookup_addr).await;
2393        assert_eq!(found_channel_id, Some(test_channel_id));
2394
2395        Ok(())
2396    }
2397
2398    #[tokio::test]
2399    async fn test_get_channel_id_by_address_not_found() -> Result<()> {
2400        let config = create_test_node_config();
2401        let node = P2PNode::new(config).await?;
2402
2403        // Test: Try to find a channel that doesn't exist
2404        let unknown_addr = MultiAddr::quic("192.168.1.200:9000".parse().unwrap());
2405        let result = node.get_channel_id_by_address(&unknown_addr).await;
2406        assert_eq!(result, None);
2407
2408        Ok(())
2409    }
2410
2411    #[tokio::test]
2412    async fn test_get_channel_id_by_address_invalid_format() -> Result<()> {
2413        let config = create_test_node_config();
2414        let node = P2PNode::new(config).await?;
2415
2416        // Test: Non-IP address should return None (no matching socket addr)
2417        let ble_addr = MultiAddr::new(crate::address::TransportAddr::Ble {
2418            mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
2419            psm: 0x0025,
2420        });
2421        let result = node.get_channel_id_by_address(&ble_addr).await;
2422        assert_eq!(result, None);
2423
2424        Ok(())
2425    }
2426
2427    #[tokio::test]
2428    async fn test_get_channel_id_by_address_multiple_peers() -> Result<()> {
2429        let config = create_test_node_config();
2430        let node = P2PNode::new(config).await?;
2431
2432        // Add multiple peers with different addresses
2433        let peer1_id = "peer_1".to_string();
2434        let peer1_addr_str = "192.168.1.101:9001";
2435        let peer1_multiaddr = MultiAddr::quic(peer1_addr_str.parse().unwrap());
2436
2437        let peer2_id = "peer_2".to_string();
2438        let peer2_addr_str = "192.168.1.102:9002";
2439        let peer2_multiaddr = MultiAddr::quic(peer2_addr_str.parse().unwrap());
2440
2441        let peer1_info = PeerInfo {
2442            channel_id: peer1_id.clone(),
2443            addresses: vec![peer1_multiaddr],
2444            connected_at: Instant::now(),
2445            last_seen: Instant::now(),
2446            status: ConnectionStatus::Connected,
2447            protocols: vec!["test-protocol".to_string()],
2448            heartbeat_count: 0,
2449        };
2450
2451        let peer2_info = PeerInfo {
2452            channel_id: peer2_id.clone(),
2453            addresses: vec![peer2_multiaddr],
2454            connected_at: Instant::now(),
2455            last_seen: Instant::now(),
2456            status: ConnectionStatus::Connected,
2457            protocols: vec!["test-protocol".to_string()],
2458            heartbeat_count: 0,
2459        };
2460
2461        node.transport
2462            .inject_peer(peer1_id.clone(), peer1_info)
2463            .await;
2464        node.transport
2465            .inject_peer(peer2_id.clone(), peer2_info)
2466            .await;
2467
2468        // Test: Find each channel by their unique address
2469        let found_peer1 = node
2470            .get_channel_id_by_address(&MultiAddr::quic(peer1_addr_str.parse().unwrap()))
2471            .await;
2472        let found_peer2 = node
2473            .get_channel_id_by_address(&MultiAddr::quic(peer2_addr_str.parse().unwrap()))
2474            .await;
2475
2476        assert_eq!(found_peer1, Some(peer1_id));
2477        assert_eq!(found_peer2, Some(peer2_id));
2478
2479        Ok(())
2480    }
2481
2482    #[tokio::test]
2483    async fn test_list_active_connections_empty() -> Result<()> {
2484        let config = create_test_node_config();
2485        let node = P2PNode::new(config).await?;
2486
2487        // Test: No connections initially
2488        let connections = node.list_active_connections().await;
2489        assert!(connections.is_empty());
2490
2491        Ok(())
2492    }
2493
2494    #[tokio::test]
2495    async fn test_list_active_connections_with_peers() -> Result<()> {
2496        let config = create_test_node_config();
2497        let node = P2PNode::new(config).await?;
2498
2499        // Add multiple peers
2500        let peer1_id = "peer_1".to_string();
2501        let peer1_addrs = vec![
2502            MultiAddr::quic("192.168.1.101:9001".parse().unwrap()),
2503            MultiAddr::quic("192.168.1.101:9002".parse().unwrap()),
2504        ];
2505
2506        let peer2_id = "peer_2".to_string();
2507        let peer2_addrs = vec![MultiAddr::quic("192.168.1.102:9003".parse().unwrap())];
2508
2509        let peer1_info = PeerInfo {
2510            channel_id: peer1_id.clone(),
2511            addresses: peer1_addrs.clone(),
2512            connected_at: Instant::now(),
2513            last_seen: Instant::now(),
2514            status: ConnectionStatus::Connected,
2515            protocols: vec!["test-protocol".to_string()],
2516            heartbeat_count: 0,
2517        };
2518
2519        let peer2_info = PeerInfo {
2520            channel_id: peer2_id.clone(),
2521            addresses: peer2_addrs.clone(),
2522            connected_at: Instant::now(),
2523            last_seen: Instant::now(),
2524            status: ConnectionStatus::Connected,
2525            protocols: vec!["test-protocol".to_string()],
2526            heartbeat_count: 0,
2527        };
2528
2529        node.transport
2530            .inject_peer(peer1_id.clone(), peer1_info)
2531            .await;
2532        node.transport
2533            .inject_peer(peer2_id.clone(), peer2_info)
2534            .await;
2535
2536        // Also add to active_connections (list_active_connections iterates over this)
2537        node.transport
2538            .inject_active_connection(peer1_id.clone())
2539            .await;
2540        node.transport
2541            .inject_active_connection(peer2_id.clone())
2542            .await;
2543
2544        // Test: List all active connections
2545        let connections = node.list_active_connections().await;
2546        assert_eq!(connections.len(), 2);
2547
2548        // Verify peer1 and peer2 are in the list
2549        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
2550        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
2551
2552        assert!(peer1_conn.is_some());
2553        assert!(peer2_conn.is_some());
2554
2555        // Verify addresses match
2556        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
2557        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
2558
2559        Ok(())
2560    }
2561
2562    #[tokio::test]
2563    async fn test_remove_channel_success() -> Result<()> {
2564        let config = create_test_node_config();
2565        let node = P2PNode::new(config).await?;
2566
2567        // Add a peer
2568        let channel_id = "peer_to_remove".to_string();
2569        let channel_peer_id = PeerId::from_name(&channel_id);
2570        let peer_info = PeerInfo {
2571            channel_id: channel_id.clone(),
2572            addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
2573            connected_at: Instant::now(),
2574            last_seen: Instant::now(),
2575            status: ConnectionStatus::Connected,
2576            protocols: vec!["test-protocol".to_string()],
2577            heartbeat_count: 0,
2578        };
2579
2580        node.transport
2581            .inject_peer(channel_id.clone(), peer_info)
2582            .await;
2583        node.transport
2584            .inject_peer_to_channel(channel_peer_id, channel_id.clone())
2585            .await;
2586
2587        // Verify peer exists
2588        assert!(node.is_peer_connected(&channel_peer_id).await);
2589
2590        // Remove the channel
2591        let removed = node.remove_channel(&channel_id).await;
2592        assert!(removed);
2593
2594        // Verify peer no longer exists
2595        assert!(!node.is_peer_connected(&channel_peer_id).await);
2596
2597        Ok(())
2598    }
2599
2600    #[tokio::test]
2601    async fn test_remove_channel_nonexistent() -> Result<()> {
2602        let config = create_test_node_config();
2603        let node = P2PNode::new(config).await?;
2604
2605        // Try to remove a channel that doesn't exist
2606        let removed = node.remove_channel("nonexistent_peer").await;
2607        assert!(!removed);
2608
2609        Ok(())
2610    }
2611
2612    #[tokio::test]
2613    async fn test_is_peer_connected() -> Result<()> {
2614        let config = create_test_node_config();
2615        let node = P2PNode::new(config).await?;
2616
2617        let channel_id = "test_peer".to_string();
2618        let channel_peer_id = PeerId::from_name(&channel_id);
2619
2620        // Initially not connected
2621        assert!(!node.is_peer_connected(&channel_peer_id).await);
2622
2623        // Add peer
2624        let peer_info = PeerInfo {
2625            channel_id: channel_id.clone(),
2626            addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
2627            connected_at: Instant::now(),
2628            last_seen: Instant::now(),
2629            status: ConnectionStatus::Connected,
2630            protocols: vec!["test-protocol".to_string()],
2631            heartbeat_count: 0,
2632        };
2633
2634        node.transport
2635            .inject_peer(channel_id.clone(), peer_info)
2636            .await;
2637        node.transport
2638            .inject_peer_to_channel(channel_peer_id, channel_id.clone())
2639            .await;
2640
2641        // Now connected
2642        assert!(node.is_peer_connected(&channel_peer_id).await);
2643
2644        // Remove channel
2645        node.remove_channel(&channel_id).await;
2646
2647        // No longer connected
2648        assert!(!node.is_peer_connected(&channel_peer_id).await);
2649
2650        Ok(())
2651    }
2652
2653    #[test]
2654    fn test_normalize_ipv6_wildcard() {
2655        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
2656
2657        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
2658        let normalized = normalize_wildcard_to_loopback(wildcard);
2659
2660        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
2661        assert_eq!(normalized.port(), 8080);
2662    }
2663
2664    #[test]
2665    fn test_normalize_ipv4_wildcard() {
2666        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2667
2668        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
2669        let normalized = normalize_wildcard_to_loopback(wildcard);
2670
2671        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
2672        assert_eq!(normalized.port(), 9000);
2673    }
2674
2675    #[test]
2676    fn test_normalize_specific_address_unchanged() {
2677        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
2678        let normalized = normalize_wildcard_to_loopback(specific);
2679
2680        assert_eq!(normalized, specific);
2681    }
2682
2683    #[test]
2684    fn test_normalize_loopback_unchanged() {
2685        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
2686
2687        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
2688        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
2689        assert_eq!(normalized_v6, loopback_v6);
2690
2691        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
2692        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
2693        assert_eq!(normalized_v4, loopback_v4);
2694    }
2695
2696    // ---- parse_protocol_message regression tests ----
2697
2698    /// Get current Unix timestamp for tests
2699    fn current_timestamp() -> u64 {
2700        std::time::SystemTime::now()
2701            .duration_since(std::time::UNIX_EPOCH)
2702            .map(|d| d.as_secs())
2703            .unwrap_or(0)
2704    }
2705
2706    /// Helper to create a postcard-serialized WireMessage for tests
2707    fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
2708        let msg = WireMessage {
2709            protocol: protocol.to_string(),
2710            data,
2711            from: PeerId::from_name(from),
2712            timestamp,
2713            user_agent: String::new(),
2714            public_key: Vec::new(),
2715            signature: Vec::new(),
2716        };
2717        postcard::to_stdvec(&msg).unwrap()
2718    }
2719
2720    #[test]
2721    fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
2722        // Regression: For unsigned messages, P2PEvent::Message.source must be the
2723        // transport peer ID, NOT the "from" field from the wire message.
2724        let transport_id = "abcdef0123456789";
2725        let logical_id = "spoofed-logical-id";
2726        let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
2727
2728        let parsed =
2729            parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
2730
2731        // Unsigned message: no authenticated node ID
2732        assert!(parsed.authenticated_node_id.is_none());
2733
2734        match parsed.event {
2735            P2PEvent::Message {
2736                topic,
2737                source,
2738                data,
2739            } => {
2740                assert!(source.is_none(), "unsigned message source must be None");
2741                assert_eq!(topic, "test/v1");
2742                assert_eq!(data, vec![1u8, 2, 3]);
2743            }
2744            other => panic!("expected P2PEvent::Message, got {:?}", other),
2745        }
2746    }
2747
2748    #[test]
2749    fn test_parse_protocol_message_rejects_invalid_bytes() {
2750        // Random bytes that are not valid bincode should be rejected
2751        assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
2752    }
2753
2754    #[test]
2755    fn test_parse_protocol_message_rejects_truncated_message() {
2756        // A truncated bincode message should fail to deserialize
2757        let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
2758        let truncated = &full_bytes[..full_bytes.len() / 2];
2759        assert!(parse_protocol_message(truncated, "peer-id").is_none());
2760    }
2761
2762    #[test]
2763    fn test_parse_protocol_message_empty_payload() {
2764        let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
2765
2766        let parsed = parse_protocol_message(&bytes, "transport-peer")
2767            .expect("valid message with empty data should parse");
2768
2769        match parsed.event {
2770            P2PEvent::Message { data, .. } => assert!(data.is_empty()),
2771            other => panic!("expected P2PEvent::Message, got {:?}", other),
2772        }
2773    }
2774
2775    #[test]
2776    fn test_parse_protocol_message_preserves_binary_payload() {
2777        // Verify that arbitrary byte values (including 0xFF, 0x00) survive round-trip
2778        let payload: Vec<u8> = (0..=255).collect();
2779        let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
2780
2781        let parsed = parse_protocol_message(&bytes, "peer-id")
2782            .expect("valid message with full byte range should parse");
2783
2784        match parsed.event {
2785            P2PEvent::Message { data, topic, .. } => {
2786                assert_eq!(topic, "binary/v1");
2787                assert_eq!(
2788                    data, payload,
2789                    "payload must survive bincode round-trip exactly"
2790                );
2791            }
2792            other => panic!("expected P2PEvent::Message, got {:?}", other),
2793        }
2794    }
2795
2796    #[test]
2797    fn test_parse_signed_message_verifies_and_uses_node_id() {
2798        let identity = NodeIdentity::generate().expect("should generate identity");
2799        let protocol = "test/signed";
2800        let data: Vec<u8> = vec![10, 20, 30];
2801        // The `from` field must match the PeerId derived from the public key.
2802        let from = *identity.peer_id();
2803        let timestamp = current_timestamp();
2804        let user_agent = "test/1.0";
2805
2806        // Compute signable bytes the same way create_protocol_message does
2807        let signable =
2808            postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
2809                .unwrap();
2810        let sig = identity.sign(&signable).expect("signing should succeed");
2811
2812        let msg = WireMessage {
2813            protocol: protocol.to_string(),
2814            data: data.clone(),
2815            from,
2816            timestamp,
2817            user_agent: user_agent.to_string(),
2818            public_key: identity.public_key().as_bytes().to_vec(),
2819            signature: sig.as_bytes().to_vec(),
2820        };
2821        let bytes = postcard::to_stdvec(&msg).unwrap();
2822
2823        let parsed =
2824            parse_protocol_message(&bytes, "transport-xyz").expect("signed message should parse");
2825
2826        let expected_peer_id = *identity.peer_id();
2827        assert_eq!(
2828            parsed.authenticated_node_id.as_ref(),
2829            Some(&expected_peer_id)
2830        );
2831
2832        match parsed.event {
2833            P2PEvent::Message { source, .. } => {
2834                assert_eq!(
2835                    source.as_ref(),
2836                    Some(&expected_peer_id),
2837                    "source should be the verified PeerId"
2838                );
2839            }
2840            other => panic!("expected P2PEvent::Message, got {:?}", other),
2841        }
2842    }
2843
2844    #[test]
2845    fn test_parse_message_with_bad_signature_is_rejected() {
2846        let identity = NodeIdentity::generate().expect("should generate identity");
2847        let protocol = "test/bad-sig";
2848        let data: Vec<u8> = vec![1, 2, 3];
2849        let from = *identity.peer_id();
2850        let timestamp = current_timestamp();
2851        let user_agent = "test/1.0";
2852
2853        // Sign correct signable bytes
2854        let signable =
2855            postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
2856                .unwrap();
2857        let sig = identity.sign(&signable).expect("signing should succeed");
2858
2859        // Tamper with the data (signature was over [1,2,3], not [99,99,99])
2860        let msg = WireMessage {
2861            protocol: protocol.to_string(),
2862            data: vec![99, 99, 99],
2863            from,
2864            timestamp,
2865            user_agent: user_agent.to_string(),
2866            public_key: identity.public_key().as_bytes().to_vec(),
2867            signature: sig.as_bytes().to_vec(),
2868        };
2869        let bytes = postcard::to_stdvec(&msg).unwrap();
2870
2871        assert!(
2872            parse_protocol_message(&bytes, "transport-xyz").is_none(),
2873            "message with bad signature should be rejected"
2874        );
2875    }
2876
2877    #[test]
2878    fn test_parse_message_with_mismatched_from_is_rejected() {
2879        let identity = NodeIdentity::generate().expect("should generate identity");
2880        let protocol = "test/from-mismatch";
2881        let data: Vec<u8> = vec![1, 2, 3];
2882        // Use a `from` field that does NOT match the public key's PeerId.
2883        let fake_from = PeerId::from_bytes([0xDE; 32]);
2884        let timestamp = current_timestamp();
2885        let user_agent = "test/1.0";
2886
2887        let signable =
2888            postcard::to_stdvec(&(protocol, data.as_slice(), &fake_from, timestamp, user_agent))
2889                .unwrap();
2890        let sig = identity.sign(&signable).expect("signing should succeed");
2891
2892        let msg = WireMessage {
2893            protocol: protocol.to_string(),
2894            data,
2895            from: fake_from,
2896            timestamp,
2897            user_agent: user_agent.to_string(),
2898            public_key: identity.public_key().as_bytes().to_vec(),
2899            signature: sig.as_bytes().to_vec(),
2900        };
2901        let bytes = postcard::to_stdvec(&msg).unwrap();
2902
2903        assert!(
2904            parse_protocol_message(&bytes, "transport-xyz").is_none(),
2905            "message with mismatched from field should be rejected"
2906        );
2907    }
2908}