Skip to main content

saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::PeerId;
20use crate::adaptive::{AdaptiveDHT, AdaptiveDhtConfig, TrustEngine, TrustEvent};
21use crate::bootstrap::{BootstrapConfig, BootstrapManager};
22use crate::dht_network_manager::{DhtNetworkConfig, DhtNetworkManager};
23use crate::error::{NetworkError, P2PError, P2pResult as Result};
24
25use crate::MultiAddr;
26use crate::identity::node_identity::{NodeIdentity, peer_id_from_public_key};
27use crate::quantum_crypto::saorsa_transport_integration::{MlDsaPublicKey, MlDsaSignature};
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32use std::time::Duration;
33use tokio::sync::{RwLock, broadcast};
34use tokio::time::Instant;
35use tokio_util::sync::CancellationToken;
36
37/// Wire protocol message format for P2P communication.
38///
39/// Serialized with postcard for compact binary encoding.
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub(crate) struct WireMessage {
42    /// Protocol/topic identifier
43    pub(crate) protocol: String,
44    /// Raw payload bytes
45    pub(crate) data: Vec<u8>,
46    /// Sender's peer ID (verified against transport-level identity)
47    pub(crate) from: PeerId,
48    /// Unix timestamp in seconds
49    pub(crate) timestamp: u64,
50    /// User agent string identifying the sender's software.
51    ///
52    /// Convention: `"node/<version>"` for full DHT participants,
53    /// `"client/<version>"` or `"<app>/<version>"` for ephemeral clients.
54    /// Included in the signed bytes — tamper-proof.
55    #[serde(default)]
56    pub(crate) user_agent: String,
57    /// Sender's ML-DSA-65 public key (1952 bytes). Empty if unsigned.
58    #[serde(default)]
59    pub(crate) public_key: Vec<u8>,
60    /// ML-DSA-65 signature over the signable bytes. Empty if unsigned.
61    #[serde(default)]
62    pub(crate) signature: Vec<u8>,
63}
64
65/// Operating mode of a P2P node.
66///
67/// Determines the default user agent and DHT participation behavior.
68/// `Node` peers participate in the DHT routing table; `Client` peers
69/// are treated as ephemeral and excluded from routing.
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
71pub enum NodeMode {
72    /// Full DHT-participant node that maintains routing state and routes messages.
73    #[default]
74    Node,
75    /// Ephemeral client that connects to perform operations without joining the DHT.
76    Client,
77}
78
79/// Internal listen mode controlling which network interfaces the node binds to.
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81enum ListenMode {
82    /// Bind to all interfaces (`0.0.0.0` / `::`).
83    Public,
84    /// Bind to loopback only (`127.0.0.1` / `::1`).
85    Local,
86}
87
88/// Returns the default user agent string for the given mode.
89///
90/// - `Node` → `"node/<saorsa-core-version>"`
91/// - `Client` → `"client/<saorsa-core-version>"`
92pub fn user_agent_for_mode(mode: NodeMode) -> String {
93    let prefix = match mode {
94        NodeMode::Node => "node",
95        NodeMode::Client => "client",
96    };
97    format!("{prefix}/{}", env!("CARGO_PKG_VERSION"))
98}
99
100/// Returns `true` if the user agent identifies a full DHT participant (prefix `"node/"`).
101pub fn is_dht_participant(user_agent: &str) -> bool {
102    user_agent.starts_with("node/")
103}
104
105/// Capacity of the internal channel used by the message receiving system.
106pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
107
108/// Maximum number of concurrent in-flight request/response operations.
109pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
110
111/// Maximum allowed timeout for a single request (5 minutes).
112pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
113
114/// Default listen port for the P2P node.
115const DEFAULT_LISTEN_PORT: u16 = 9000;
116
117/// Default maximum number of concurrent connections.
118const DEFAULT_MAX_CONNECTIONS: usize = 10_000;
119
120/// Default connection timeout in seconds.
121const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 30;
122
123/// DHT max XOR distance (full 160-bit keyspace).
124const DHT_MAX_DISTANCE: u8 = 160;
125
126/// Number of cached bootstrap peers to retrieve.
127const BOOTSTRAP_PEER_BATCH_SIZE: usize = 20;
128
129/// Timeout in seconds for waiting on a bootstrap peer's identity exchange.
130const BOOTSTRAP_IDENTITY_TIMEOUT_SECS: u64 = 10;
131
132/// Serde helper — returns `true`.
133const fn default_true() -> bool {
134    true
135}
136
137/// Configuration for a P2P node
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct NodeConfig {
140    /// Bind to loopback only (`127.0.0.1` / `::1`).
141    ///
142    /// When `true`, the node listens on loopback addresses suitable for
143    /// local development and testing. When `false` (the default), the node
144    /// listens on all interfaces (`0.0.0.0` / `::`).
145    #[serde(default)]
146    pub local: bool,
147
148    /// Listen port. `0` means OS-assigned ephemeral port.
149    #[serde(default)]
150    pub port: u16,
151
152    /// Enable IPv6 dual-stack binding.
153    ///
154    /// When `true` (the default), both an IPv4 and an IPv6 address are
155    /// bound. When `false`, only IPv4 is used.
156    #[serde(default = "default_true")]
157    pub ipv6: bool,
158
159    /// Bootstrap peers to connect to on startup.
160    pub bootstrap_peers: Vec<crate::MultiAddr>,
161
162    // MCP removed; will be redesigned later
163    /// Connection timeout duration
164    pub connection_timeout: Duration,
165
166    /// Maximum number of concurrent connections
167    pub max_connections: usize,
168
169    /// DHT configuration
170    pub dht_config: DHTConfig,
171
172    /// Bootstrap cache configuration
173    pub bootstrap_cache_config: Option<BootstrapConfig>,
174
175    /// Optional IP diversity configuration for Sybil protection tuning.
176    ///
177    /// When set, this configuration is used by bootstrap peer discovery and
178    /// other diversity-enforcing subsystems. If `None`, defaults are used.
179    pub diversity_config: Option<crate::security::IPDiversityConfig>,
180
181    /// Optional override for the maximum application-layer message size.
182    ///
183    /// When `None`, the underlying saorsa-transport default is used.
184    #[serde(default)]
185    pub max_message_size: Option<usize>,
186
187    /// Optional node identity for app-level message signing.
188    ///
189    /// When set, outgoing messages are signed with the node's ML-DSA-65 key
190    /// and incoming signed messages are verified at the transport layer.
191    #[serde(skip)]
192    pub node_identity: Option<Arc<NodeIdentity>>,
193
194    /// Operating mode of this node.
195    ///
196    /// Determines the default user agent and DHT participation:
197    /// - `Node` → user agent `"node/<version>"`, added to DHT routing tables.
198    /// - `Client` → user agent `"client/<version>"`, treated as ephemeral.
199    #[serde(default)]
200    pub mode: NodeMode,
201
202    /// Optional custom user agent override.
203    ///
204    /// When `Some`, this value is used instead of the mode-derived default.
205    /// When `None`, the user agent is derived from [`NodeConfig::mode`].
206    #[serde(default, skip_serializing_if = "Option::is_none")]
207    pub custom_user_agent: Option<String>,
208
209    /// Allow loopback addresses (127.0.0.1, ::1) in the transport layer.
210    ///
211    /// In production, loopback addresses are rejected because they are not
212    /// routable. Enable this for local devnets and testnets where all nodes
213    /// run on the same machine.
214    ///
215    /// Default: `false`
216    #[serde(default)]
217    pub allow_loopback: bool,
218
219    /// Adaptive DHT configuration (trust-based blocking and eviction).
220    ///
221    /// Controls whether peers with low trust scores are evicted from the
222    /// routing table and blocked from DHT operations. Use
223    /// [`NodeConfigBuilder::trust_enforcement`] for a simple on/off toggle.
224    ///
225    /// Default: enabled with a block threshold of 0.15.
226    #[serde(default)]
227    pub adaptive_dht_config: AdaptiveDhtConfig,
228}
229
230/// DHT-specific configuration
231#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct DHTConfig {
233    /// Kademlia K parameter (bucket size)
234    pub k_value: usize,
235
236    /// Kademlia alpha parameter (parallelism)
237    pub alpha_value: usize,
238
239    /// DHT refresh interval
240    pub refresh_interval: Duration,
241}
242
243// ============================================================================
244// Address Construction Helpers
245// ============================================================================
246
247/// Build QUIC listen addresses based on port, IPv6 preference, and listen mode.
248///
249/// All returned addresses use the QUIC transport — the only transport
250/// currently supported for dialing. When additional transports are added,
251/// extend this function to produce addresses for those transports as well.
252///
253/// `ListenMode::Public` uses unspecified (all-interface) addresses;
254/// `ListenMode::Local` uses loopback addresses.
255#[inline]
256fn build_listen_addrs(port: u16, ipv6_enabled: bool, mode: ListenMode) -> Vec<MultiAddr> {
257    let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
258
259    let (v4, v6) = match mode {
260        ListenMode::Public => (
261            std::net::Ipv4Addr::UNSPECIFIED,
262            std::net::Ipv6Addr::UNSPECIFIED,
263        ),
264        ListenMode::Local => (std::net::Ipv4Addr::LOCALHOST, std::net::Ipv6Addr::LOCALHOST),
265    };
266
267    if ipv6_enabled {
268        addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
269            std::net::IpAddr::V6(v6),
270            port,
271        )));
272    }
273
274    addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
275        std::net::IpAddr::V4(v4),
276        port,
277    )));
278
279    addrs
280}
281
282impl NodeConfig {
283    /// Returns the effective user agent string.
284    ///
285    /// If a custom user agent was set, returns that. Otherwise, derives
286    /// the user agent from the node's [`NodeMode`].
287    pub fn user_agent(&self) -> String {
288        self.custom_user_agent
289            .clone()
290            .unwrap_or_else(|| user_agent_for_mode(self.mode))
291    }
292
293    /// Compute the listen addresses from the configuration fields.
294    ///
295    /// The returned addresses are derived from [`local`](Self::local),
296    /// [`port`](Self::port), and [`ipv6`](Self::ipv6).
297    pub fn listen_addrs(&self) -> Vec<MultiAddr> {
298        let mode = if self.local {
299            ListenMode::Local
300        } else {
301            ListenMode::Public
302        };
303        build_listen_addrs(self.port, self.ipv6, mode)
304    }
305
306    /// Create a new NodeConfig with default values
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if default addresses cannot be parsed
311    pub fn new() -> Result<Self> {
312        Ok(Self::default())
313    }
314
315    /// Create a builder for customized NodeConfig construction
316    pub fn builder() -> NodeConfigBuilder {
317        NodeConfigBuilder::default()
318    }
319}
320
321// ============================================================================
322// NodeConfig Builder Pattern
323// ============================================================================
324
325/// Builder for constructing [`NodeConfig`] with a transport-aware fluent API.
326///
327/// Defaults are chosen for quick local development:
328/// - QUIC on a random free port (`0`)
329/// - IPv6 enabled (dual-stack)
330/// - All interfaces (not local-only)
331///
332/// # Examples
333///
334/// ```rust,ignore
335/// // Simplest — QUIC on random port, IPv6 on, all interfaces
336/// let config = NodeConfig::builder().build()?;
337///
338/// // Local dev/test mode (loopback, auto-enables allow_loopback)
339/// let config = NodeConfig::builder()
340///     .local(true)
341///     .build()?;
342/// ```
343#[derive(Debug, Clone)]
344pub struct NodeConfigBuilder {
345    port: u16,
346    ipv6: bool,
347    local: bool,
348    bootstrap_peers: Vec<crate::MultiAddr>,
349    max_connections: Option<usize>,
350    connection_timeout: Option<Duration>,
351    dht_config: Option<DHTConfig>,
352    max_message_size: Option<usize>,
353    mode: NodeMode,
354    custom_user_agent: Option<String>,
355    allow_loopback: Option<bool>,
356    adaptive_dht_config: Option<AdaptiveDhtConfig>,
357}
358
359impl Default for NodeConfigBuilder {
360    fn default() -> Self {
361        Self {
362            port: 0,
363            ipv6: true,
364            local: false,
365            bootstrap_peers: Vec::new(),
366            max_connections: None,
367            connection_timeout: None,
368            dht_config: None,
369            max_message_size: None,
370            mode: NodeMode::default(),
371            custom_user_agent: None,
372            allow_loopback: None,
373            adaptive_dht_config: None,
374        }
375    }
376}
377
378impl NodeConfigBuilder {
379    /// Set the listen port. Default: `0` (random free port).
380    pub fn port(mut self, port: u16) -> Self {
381        self.port = port;
382        self
383    }
384
385    /// Enable or disable IPv6 dual-stack. Default: `true`.
386    pub fn ipv6(mut self, enabled: bool) -> Self {
387        self.ipv6 = enabled;
388        self
389    }
390
391    /// Bind to loopback only (`true`) or all interfaces (`false`).
392    ///
393    /// When `true`, automatically enables `allow_loopback` unless explicitly
394    /// overridden via [`Self::allow_loopback`].
395    ///
396    /// Default: `false` (all interfaces).
397    pub fn local(mut self, local: bool) -> Self {
398        self.local = local;
399        self
400    }
401
402    /// Add a bootstrap peer.
403    pub fn bootstrap_peer(mut self, addr: crate::MultiAddr) -> Self {
404        self.bootstrap_peers.push(addr);
405        self
406    }
407
408    /// Set maximum connections.
409    pub fn max_connections(mut self, max: usize) -> Self {
410        self.max_connections = Some(max);
411        self
412    }
413
414    /// Set connection timeout.
415    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
416        self.connection_timeout = Some(timeout);
417        self
418    }
419
420    /// Set DHT configuration.
421    pub fn dht_config(mut self, config: DHTConfig) -> Self {
422        self.dht_config = Some(config);
423        self
424    }
425
426    /// Set maximum application-layer message size in bytes.
427    ///
428    /// If this method is not called, saorsa-transport's built-in default is used.
429    pub fn max_message_size(mut self, max_message_size: usize) -> Self {
430        self.max_message_size = Some(max_message_size);
431        self
432    }
433
434    /// Set the operating mode (Node or Client).
435    pub fn mode(mut self, mode: NodeMode) -> Self {
436        self.mode = mode;
437        self
438    }
439
440    /// Set a custom user agent string, overriding the mode-derived default.
441    pub fn custom_user_agent(mut self, user_agent: impl Into<String>) -> Self {
442        self.custom_user_agent = Some(user_agent.into());
443        self
444    }
445
446    /// Explicitly control whether loopback addresses are allowed in the
447    /// transport layer. When not called, `local(true)` auto-enables this;
448    /// `local(false)` defaults to `false`.
449    pub fn allow_loopback(mut self, allow: bool) -> Self {
450        self.allow_loopback = Some(allow);
451        self
452    }
453
454    /// Enable or disable trust-based peer eviction and blocking.
455    ///
456    /// When `false`, peers are never evicted from the routing table or
457    /// blocked from DHT operations based on trust scores. Trust scores
458    /// are still tracked but have no enforcement effect.
459    ///
460    /// When `true` (the default), peers whose trust score falls below the
461    /// block threshold (0.15) are immediately evicted and blocked.
462    ///
463    /// For fine-grained control over the threshold, use
464    /// [`adaptive_dht_config`](Self::adaptive_dht_config) instead.
465    pub fn trust_enforcement(mut self, enabled: bool) -> Self {
466        let threshold = if enabled {
467            AdaptiveDhtConfig::default().block_threshold
468        } else {
469            0.0
470        };
471        self.adaptive_dht_config = Some(AdaptiveDhtConfig {
472            block_threshold: threshold,
473        });
474        self
475    }
476
477    /// Set the full adaptive DHT configuration.
478    ///
479    /// Overrides any previous call to [`trust_enforcement`](Self::trust_enforcement).
480    pub fn adaptive_dht_config(mut self, config: AdaptiveDhtConfig) -> Self {
481        self.adaptive_dht_config = Some(config);
482        self
483    }
484
485    /// Build the [`NodeConfig`].
486    ///
487    /// # Errors
488    ///
489    /// Returns an error if address construction fails.
490    pub fn build(self) -> Result<NodeConfig> {
491        // local mode auto-enables allow_loopback unless explicitly overridden
492        let allow_loopback = self.allow_loopback.unwrap_or(self.local);
493
494        Ok(NodeConfig {
495            local: self.local,
496            port: self.port,
497            ipv6: self.ipv6,
498            bootstrap_peers: self.bootstrap_peers,
499            connection_timeout: self
500                .connection_timeout
501                .unwrap_or(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS)),
502            max_connections: self.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
503            dht_config: self.dht_config.unwrap_or_default(),
504            bootstrap_cache_config: None,
505            diversity_config: None,
506            max_message_size: self.max_message_size,
507            node_identity: None,
508            mode: self.mode,
509            custom_user_agent: self.custom_user_agent,
510            allow_loopback,
511            adaptive_dht_config: self.adaptive_dht_config.unwrap_or_default(),
512        })
513    }
514}
515
516impl Default for NodeConfig {
517    fn default() -> Self {
518        Self {
519            local: false,
520            port: DEFAULT_LISTEN_PORT,
521            ipv6: true,
522            bootstrap_peers: Vec::new(),
523            connection_timeout: Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS),
524            max_connections: DEFAULT_MAX_CONNECTIONS,
525            dht_config: DHTConfig::default(),
526            bootstrap_cache_config: None,
527            diversity_config: None,
528            max_message_size: None,
529            node_identity: None,
530            mode: NodeMode::default(),
531            custom_user_agent: None,
532            allow_loopback: false,
533            adaptive_dht_config: AdaptiveDhtConfig::default(),
534        }
535    }
536}
537
538impl DHTConfig {
539    const DEFAULT_K_VALUE: usize = 20;
540    const DEFAULT_ALPHA_VALUE: usize = 5;
541    const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
542}
543
544impl Default for DHTConfig {
545    fn default() -> Self {
546        Self {
547            k_value: Self::DEFAULT_K_VALUE,
548            alpha_value: Self::DEFAULT_ALPHA_VALUE,
549            refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
550        }
551    }
552}
553
554/// Information about a connected peer
555#[derive(Debug, Clone)]
556pub struct PeerInfo {
557    /// Transport-level channel identifier (internal use only).
558    #[allow(dead_code)]
559    pub(crate) channel_id: String,
560
561    /// Peer's addresses
562    pub addresses: Vec<MultiAddr>,
563
564    /// Connection timestamp
565    pub connected_at: Instant,
566
567    /// Last seen timestamp
568    pub last_seen: Instant,
569
570    /// Connection status
571    pub status: ConnectionStatus,
572
573    /// Supported protocols
574    pub protocols: Vec<String>,
575
576    /// Number of heartbeats received
577    pub heartbeat_count: u64,
578}
579
580/// Connection status for a peer
581#[derive(Debug, Clone, PartialEq)]
582pub enum ConnectionStatus {
583    /// Connection is being established
584    Connecting,
585    /// Connection is established and active
586    Connected,
587    /// Connection is being closed
588    Disconnecting,
589    /// Connection is closed
590    Disconnected,
591    /// Connection failed
592    Failed(String),
593}
594
595/// Network events that can occur in the P2P system
596///
597/// Events are broadcast to all listeners and provide real-time
598/// notifications of network state changes and message arrivals.
599#[derive(Debug, Clone)]
600pub enum P2PEvent {
601    /// Message received from a peer on a specific topic
602    Message {
603        /// Topic or channel the message was sent on
604        topic: String,
605        /// For signed messages this is the authenticated app-level [`PeerId`];
606        /// `None` for unsigned messages.
607        source: Option<PeerId>,
608        /// Raw message data payload
609        data: Vec<u8>,
610    },
611    /// An authenticated peer has connected (first signed message verified on any channel).
612    /// The `user_agent` identifies the remote software (e.g. `"node/0.12.1"`, `"client/1.0"`).
613    PeerConnected(PeerId, String),
614    /// An authenticated peer has fully disconnected (all channels closed).
615    PeerDisconnected(PeerId),
616}
617
618/// Response from a peer to a request sent via [`P2PNode::send_request`].
619///
620/// Contains the response payload along with metadata about the responder
621/// and round-trip latency.
622#[derive(Debug, Clone)]
623pub struct PeerResponse {
624    /// The peer that sent the response.
625    pub peer_id: PeerId,
626    /// Raw response payload bytes.
627    pub data: Vec<u8>,
628    /// Round-trip latency from request to response.
629    pub latency: Duration,
630}
631
632/// Wire format for request/response correlation.
633///
634/// Wraps application payloads with a message ID and direction flag
635/// so the receive loop can route responses back to waiting callers.
636#[derive(Debug, Clone, Serialize, Deserialize)]
637pub(crate) struct RequestResponseEnvelope {
638    /// Unique identifier to correlate request ↔ response.
639    pub(crate) message_id: String,
640    /// `false` for requests, `true` for responses.
641    pub(crate) is_response: bool,
642    /// Application payload.
643    pub(crate) payload: Vec<u8>,
644}
645
646/// An in-flight request awaiting a response from a specific peer.
647pub(crate) struct PendingRequest {
648    /// Oneshot sender for delivering the response payload.
649    pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
650    /// The peer we expect the response from (for origin validation).
651    pub(crate) expected_peer: PeerId,
652}
653
654/// Main P2P network node that manages connections, routing, and communication
655///
656/// This struct represents a complete P2P network participant that can:
657/// - Connect to other peers via QUIC transport
658/// - Participate in distributed hash table (DHT) operations
659/// - Send and receive messages through various protocols
660/// - Handle network events and peer lifecycle
661///
662/// Transport concerns (connections, messaging, events) are delegated to
663/// [`TransportHandle`](crate::transport_handle::TransportHandle).
664pub struct P2PNode {
665    /// Node configuration
666    config: NodeConfig,
667
668    /// Our peer ID
669    peer_id: PeerId,
670
671    /// Transport handle owning all QUIC / peer / event state
672    transport: Arc<crate::transport_handle::TransportHandle>,
673
674    /// Node start time
675    start_time: Instant,
676
677    /// Shutdown token — cancelled when the node should stop
678    shutdown: CancellationToken,
679
680    /// Adaptive DHT layer — owns both the DHT manager and the trust engine.
681    /// All DHT operations and trust signals go through this component.
682    adaptive_dht: AdaptiveDHT,
683
684    /// Bootstrap cache manager for peer discovery
685    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
686
687    /// Bootstrap state tracking - indicates whether peer discovery has completed
688    is_bootstrapped: Arc<AtomicBool>,
689
690    /// Whether `start()` has been called (and `stop()` has not yet completed)
691    is_started: Arc<AtomicBool>,
692}
693
694/// Normalize wildcard bind addresses to localhost loopback addresses
695///
696/// saorsa-transport correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
697/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
698///
699/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
700/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
701/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
702/// - All other addresses pass through unchanged
703///
704/// # Arguments
705/// * `addr` - The SocketAddr to normalize
706///
707/// # Returns
708/// * Normalized SocketAddr suitable for remote connections
709pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
710    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
711
712    if addr.ip().is_unspecified() {
713        // Convert unspecified addresses to loopback
714        let loopback_ip = match addr {
715            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
716            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
717        };
718        std::net::SocketAddr::new(loopback_ip, addr.port())
719    } else {
720        // Not a wildcard address, pass through unchanged
721        addr
722    }
723}
724
725impl P2PNode {
726    /// Create a new P2P node with the given configuration
727    pub async fn new(config: NodeConfig) -> Result<Self> {
728        // Ensure a cryptographic identity exists — generate one if not provided.
729        let node_identity = match config.node_identity.clone() {
730            Some(identity) => identity,
731            None => Arc::new(NodeIdentity::generate()?),
732        };
733
734        // Derive the canonical peer ID from the cryptographic identity.
735        let peer_id = *node_identity.peer_id();
736
737        // Initialize bootstrap cache manager
738        let bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
739        let bootstrap_manager =
740            match BootstrapManager::with_node_config(bootstrap_config, &config).await {
741                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
742                Err(_e) => {
743                    warn!("Failed to initialize bootstrap manager: {_e}, continuing without cache");
744                    None
745                }
746            };
747
748        // Build transport handle with all transport-level concerns
749        let transport_config = crate::transport_handle::TransportConfig::from_node_config(
750            &config,
751            crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
752            node_identity.clone(),
753        );
754        let transport =
755            Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
756
757        // Initialize AdaptiveDHT — creates the trust engine and DHT manager
758        let manager_dht_config = crate::dht::DHTConfig {
759            bucket_size: config.dht_config.k_value,
760            alpha: config.dht_config.alpha_value,
761            bucket_refresh_interval: config.dht_config.refresh_interval,
762            max_distance: DHT_MAX_DISTANCE,
763        };
764        let dht_manager_config = DhtNetworkConfig {
765            peer_id,
766            dht_config: manager_dht_config,
767            node_config: config.clone(),
768            request_timeout: config.connection_timeout,
769            max_concurrent_operations: MAX_ACTIVE_REQUESTS,
770            enable_security: true,
771            block_threshold: 0.0, // Set by AdaptiveDHT::new() from AdaptiveDhtConfig
772        };
773        let adaptive_dht = AdaptiveDHT::new(
774            transport.clone(),
775            dht_manager_config,
776            config.adaptive_dht_config.clone(),
777        )
778        .await?;
779
780        let node = Self {
781            config,
782            peer_id,
783            transport,
784            start_time: Instant::now(),
785            shutdown: CancellationToken::new(),
786            adaptive_dht,
787            bootstrap_manager,
788            is_bootstrapped: Arc::new(AtomicBool::new(false)),
789            is_started: Arc::new(AtomicBool::new(false)),
790        };
791        info!(
792            "Created P2P node with peer ID: {} (call start() to begin networking)",
793            node.peer_id
794        );
795
796        Ok(node)
797    }
798
799    /// Get the peer ID of this node.
800    pub fn peer_id(&self) -> &PeerId {
801        &self.peer_id
802    }
803
804    /// Get the transport handle for sharing with other components.
805    pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
806        &self.transport
807    }
808
809    pub fn local_addr(&self) -> Option<MultiAddr> {
810        self.transport.local_addr()
811    }
812
813    /// Check if the node has completed the initial bootstrap process
814    ///
815    /// Returns `true` if the node has successfully connected to at least one
816    /// bootstrap peer and performed peer discovery (FIND_NODE).
817    pub fn is_bootstrapped(&self) -> bool {
818        self.is_bootstrapped.load(Ordering::SeqCst)
819    }
820
821    /// Manually trigger re-bootstrap (useful for recovery or network rejoin)
822    ///
823    /// This clears the bootstrapped state and attempts to reconnect to
824    /// bootstrap peers and discover new peers.
825    pub async fn re_bootstrap(&self) -> Result<()> {
826        self.is_bootstrapped.store(false, Ordering::SeqCst);
827        self.connect_bootstrap_peers().await
828    }
829
830    // =========================================================================
831    // Trust API — delegates to AdaptiveDHT
832    // =========================================================================
833
834    /// Get the trust engine for advanced use cases
835    pub fn trust_engine(&self) -> Arc<TrustEngine> {
836        self.adaptive_dht.trust_engine().clone()
837    }
838
839    /// Report a trust event for a peer.
840    ///
841    /// Records a network-observable outcome (connection success/failure)
842    /// that the DHT layer did not record automatically. See [`TrustEvent`]
843    /// for the supported variants.
844    ///
845    /// # Example
846    ///
847    /// ```rust,ignore
848    /// use saorsa_core::adaptive::TrustEvent;
849    ///
850    /// node.report_trust_event(&peer_id, TrustEvent::SuccessfulResponse).await;
851    /// node.report_trust_event(&peer_id, TrustEvent::ConnectionFailed).await;
852    /// ```
853    pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
854        self.adaptive_dht.report_trust_event(peer_id, event).await;
855    }
856
857    /// Get the current trust score for a peer (0.0 to 1.0).
858    ///
859    /// Returns 0.5 (neutral) for unknown peers.
860    pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
861        self.adaptive_dht.peer_trust(peer_id)
862    }
863
864    /// Get the AdaptiveDHT component for direct access
865    pub fn adaptive_dht(&self) -> &AdaptiveDHT {
866        &self.adaptive_dht
867    }
868
869    // =========================================================================
870    // Request/Response API — Automatic Trust Feedback
871    // =========================================================================
872
873    /// Send a request to a peer and wait for a response with automatic trust reporting.
874    ///
875    /// Unlike fire-and-forget `send_message()`, this method:
876    /// 1. Wraps the payload in a `RequestResponseEnvelope` with a unique message ID
877    /// 2. Sends it on the `/rr/<protocol>` protocol prefix
878    /// 3. Waits for a matching response (or timeout)
879    /// 4. Automatically reports success or failure to the trust engine
880    ///
881    /// The remote peer's handler should call `send_response()` with the
882    /// incoming message ID to route the response back.
883    ///
884    /// # Arguments
885    ///
886    /// * `peer_id` - Target peer
887    /// * `protocol` - Application protocol name (e.g. `"peer_info"`)
888    /// * `data` - Request payload bytes
889    /// * `timeout` - Maximum time to wait for a response
890    ///
891    /// # Returns
892    ///
893    /// A [`PeerResponse`] on success, or an error on timeout / connection failure.
894    ///
895    /// # Example
896    ///
897    /// ```rust,ignore
898    /// let response = node.send_request(&peer_id, "peer_info", request_data, Duration::from_secs(10)).await?;
899    /// println!("Got {} bytes from {}", response.data.len(), response.peer_id);
900    /// ```
901    pub async fn send_request(
902        &self,
903        peer_id: &PeerId,
904        protocol: &str,
905        data: Vec<u8>,
906        timeout: Duration,
907    ) -> Result<PeerResponse> {
908        // Fail fast for blocked peers
909        if self.adaptive_dht.peer_trust(peer_id) < self.adaptive_dht.config().block_threshold {
910            return Err(P2PError::Network(crate::error::NetworkError::PeerBlocked(
911                *peer_id,
912            )));
913        }
914
915        match self
916            .transport
917            .send_request(peer_id, protocol, data, timeout)
918            .await
919        {
920            Ok(resp) => {
921                self.report_trust_event(peer_id, TrustEvent::SuccessfulResponse)
922                    .await;
923                Ok(resp)
924            }
925            Err(e) => {
926                let event = if matches!(&e, P2PError::Timeout(_)) {
927                    TrustEvent::ConnectionTimeout
928                } else {
929                    TrustEvent::ConnectionFailed
930                };
931                self.report_trust_event(peer_id, event).await;
932                Err(e)
933            }
934        }
935    }
936
937    pub async fn send_response(
938        &self,
939        peer_id: &PeerId,
940        protocol: &str,
941        message_id: &str,
942        data: Vec<u8>,
943    ) -> Result<()> {
944        self.transport
945            .send_response(peer_id, protocol, message_id, data)
946            .await
947    }
948
949    pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
950        crate::transport_handle::TransportHandle::parse_request_envelope(data)
951    }
952
953    pub async fn subscribe(&self, topic: &str) -> Result<()> {
954        self.transport.subscribe(topic).await
955    }
956
957    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
958        self.transport.publish(topic, data).await
959    }
960
961    /// Get the node configuration
962    pub fn config(&self) -> &NodeConfig {
963        &self.config
964    }
965
966    /// Start the P2P node
967    pub async fn start(&self) -> Result<()> {
968        info!("Starting P2P node...");
969
970        // Start bootstrap manager background tasks
971        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
972            let mut manager = bootstrap_manager.write().await;
973            manager
974                .start_maintenance()
975                .map_err(|e| protocol_error(format!("Failed to start bootstrap manager: {e}")))?;
976            info!("Bootstrap cache manager started");
977        }
978
979        // Start transport listeners and message receiving
980        self.transport.start_network_listeners().await?;
981
982        // Start the adaptive DHT layer (DHT manager + trust engine)
983        self.adaptive_dht.start().await?;
984
985        // Log current listen addresses
986        let _listen_addrs = self.transport.listen_addrs().await;
987        info!("P2P node started on addresses: {:?}", _listen_addrs);
988
989        // NOTE: Message receiving is now integrated into the accept loop in start_network_listeners()
990        // The old start_message_receiving_system() is no longer needed as it competed with the accept
991        // loop for incoming connections, causing messages to be lost.
992
993        // Connect to bootstrap peers
994        self.connect_bootstrap_peers().await?;
995
996        self.is_started
997            .store(true, std::sync::atomic::Ordering::Release);
998
999        Ok(())
1000    }
1001
1002    // start_network_listeners and start_message_receiving_system
1003    // are now implemented in TransportHandle
1004
1005    /// Run the P2P node (blocks until shutdown)
1006    pub async fn run(&self) -> Result<()> {
1007        if !self.is_running() {
1008            self.start().await?;
1009        }
1010
1011        info!("P2P node running...");
1012
1013        // Block until shutdown is signalled. All background work (connection
1014        // lifecycle, DHT maintenance, EigenTrust) runs in dedicated tasks.
1015        self.shutdown.cancelled().await;
1016
1017        info!("P2P node stopped");
1018        Ok(())
1019    }
1020
1021    /// Stop the P2P node
1022    pub async fn stop(&self) -> Result<()> {
1023        info!("Stopping P2P node...");
1024
1025        // Signal the run loop to exit
1026        self.shutdown.cancel();
1027
1028        // Stop DHT layer first so leave messages can be sent while transport is still active.
1029        self.adaptive_dht.stop().await?;
1030
1031        // Stop the transport layer (shutdown endpoints, join tasks, disconnect peers)
1032        self.transport.stop().await?;
1033
1034        self.is_started
1035            .store(false, std::sync::atomic::Ordering::Release);
1036
1037        info!("P2P node stopped");
1038        Ok(())
1039    }
1040
1041    /// Graceful shutdown alias for tests
1042    pub async fn shutdown(&self) -> Result<()> {
1043        self.stop().await
1044    }
1045
1046    /// Check if the node is running
1047    pub fn is_running(&self) -> bool {
1048        self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
1049    }
1050
1051    /// Get the current listen addresses
1052    pub async fn listen_addrs(&self) -> Vec<MultiAddr> {
1053        self.transport.listen_addrs().await
1054    }
1055
1056    /// Get connected peers
1057    pub async fn connected_peers(&self) -> Vec<PeerId> {
1058        self.transport.connected_peers().await
1059    }
1060
1061    /// Get peer count
1062    pub async fn peer_count(&self) -> usize {
1063        self.transport.peer_count().await
1064    }
1065
1066    /// Get peer info
1067    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1068        self.transport.peer_info(peer_id).await
1069    }
1070
1071    /// Get the channel ID for a given address, if connected (internal only).
1072    #[allow(dead_code)]
1073    pub(crate) async fn get_channel_id_by_address(&self, addr: &MultiAddr) -> Option<String> {
1074        self.transport.get_channel_id_by_address(addr).await
1075    }
1076
1077    /// List all active transport-level connections (internal only).
1078    #[allow(dead_code)]
1079    pub(crate) async fn list_active_connections(&self) -> Vec<(String, Vec<MultiAddr>)> {
1080        self.transport.list_active_connections().await
1081    }
1082
1083    /// Remove a channel from the peers map (internal only).
1084    #[allow(dead_code)]
1085    pub(crate) async fn remove_channel(&self, channel_id: &str) -> bool {
1086        self.transport.remove_channel(channel_id).await
1087    }
1088
1089    /// Close a channel's QUIC connection and remove it from all tracking maps.
1090    ///
1091    /// Use when a transport-level connection was established but identity
1092    /// exchange failed, so no [`PeerId`] is available for [`disconnect_peer`].
1093    pub(crate) async fn disconnect_channel(&self, channel_id: &str) {
1094        self.transport.disconnect_channel(channel_id).await;
1095    }
1096
1097    /// Check if an authenticated peer is connected (has at least one active channel).
1098    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1099        self.transport.is_peer_connected(peer_id).await
1100    }
1101
1102    /// Connect to a peer, returning the transport-level channel ID.
1103    ///
1104    /// The returned channel ID is **not** the app-level [`PeerId`]. To obtain
1105    /// the authenticated peer identity, call
1106    /// [`wait_for_peer_identity`](Self::wait_for_peer_identity) with the
1107    /// returned channel ID.
1108    pub async fn connect_peer(&self, address: &MultiAddr) -> Result<String> {
1109        self.transport.connect_peer(address).await
1110    }
1111
1112    /// Wait for the identity exchange on `channel_id` to complete, returning
1113    /// the authenticated [`PeerId`].
1114    ///
1115    /// Use this after [`connect_peer`](Self::connect_peer) to bridge the gap
1116    /// between the transport-level channel ID and the app-level peer identity
1117    /// required by [`send_message`](Self::send_message).
1118    pub async fn wait_for_peer_identity(
1119        &self,
1120        channel_id: &str,
1121        timeout: Duration,
1122    ) -> Result<PeerId> {
1123        self.transport
1124            .wait_for_peer_identity(channel_id, timeout)
1125            .await
1126    }
1127
1128    /// Disconnect from a peer
1129    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1130        self.transport.disconnect_peer(peer_id).await
1131    }
1132
1133    /// Check if a connection to a peer is active (internal only).
1134    #[allow(dead_code)]
1135    pub(crate) async fn is_connection_active(&self, channel_id: &str) -> bool {
1136        self.transport.is_connection_active(channel_id).await
1137    }
1138
1139    /// Send a message to an authenticated peer.
1140    pub async fn send_message(
1141        &self,
1142        peer_id: &PeerId,
1143        protocol: &str,
1144        data: Vec<u8>,
1145    ) -> Result<()> {
1146        self.transport.send_message(peer_id, protocol, data).await
1147    }
1148}
1149
1150/// Parse a postcard-encoded protocol message into a `P2PEvent::Message`.
1151///
1152/// Returns `None` if the bytes cannot be deserialized as a valid `WireMessage`.
1153///
1154/// The `from` field is a required part of the wire protocol but is **not**
1155/// used as the event source. Instead, `source` — the transport-level peer ID
1156/// derived from the authenticated QUIC connection — is used so that consumers
1157/// can pass it directly to `send_message()`. This eliminates a spoofing
1158/// vector where a peer could claim an arbitrary identity via the payload.
1159///
1160/// Maximum allowed clock skew for message timestamps (5 minutes).
1161/// This is intentionally lenient for initial deployment to accommodate nodes with
1162/// misconfigured clocks or high-latency network conditions. Can be tightened (e.g., to 60s)
1163/// once the network stabilizes and node clock synchronization improves.
1164const MAX_MESSAGE_AGE_SECS: u64 = 300;
1165/// Maximum allowed future timestamp (30 seconds to account for clock drift)
1166const MAX_FUTURE_SECS: u64 = 30;
1167
1168/// Convenience constructor for `P2PError::Network(NetworkError::ProtocolError(...))`.
1169fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
1170    P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
1171}
1172
1173/// Helper to send an event via a broadcast sender, logging at trace level if no receivers.
1174pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
1175    if let Err(_e) = tx.send(event) {
1176        trace!("Event broadcast has no receivers: {_e}");
1177    }
1178}
1179
1180/// Result of parsing a protocol message, including optional authenticated identity.
1181pub(crate) struct ParsedMessage {
1182    /// The P2P event to broadcast.
1183    pub(crate) event: P2PEvent,
1184    /// If the message was signed and verified, the authenticated app-level [`PeerId`].
1185    pub(crate) authenticated_node_id: Option<PeerId>,
1186    /// The sender's user agent string from the wire message.
1187    pub(crate) user_agent: String,
1188}
1189
1190pub(crate) fn parse_protocol_message(bytes: &[u8], _source: &str) -> Option<ParsedMessage> {
1191    let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1192
1193    // Validate timestamp to prevent replay attacks
1194    let now = std::time::SystemTime::now()
1195        .duration_since(std::time::UNIX_EPOCH)
1196        .map(|d| d.as_secs())
1197        .unwrap_or(0);
1198
1199    // Reject messages that are too old (potential replay)
1200    if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
1201        warn!(
1202            "Rejecting stale message from {} (timestamp {} is {} seconds old)",
1203            _source,
1204            message.timestamp,
1205            now.saturating_sub(message.timestamp)
1206        );
1207        return None;
1208    }
1209
1210    // Reject messages too far in the future (clock manipulation)
1211    if message.timestamp > now + MAX_FUTURE_SECS {
1212        warn!(
1213            "Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
1214            _source,
1215            message.timestamp,
1216            message.timestamp.saturating_sub(now)
1217        );
1218        return None;
1219    }
1220
1221    // Verify app-level signature if present
1222    let authenticated_node_id = if !message.signature.is_empty() {
1223        match verify_message_signature(&message) {
1224            Ok(peer_id) => {
1225                debug!(
1226                    "Message from {} authenticated as app-level NodeId {}",
1227                    _source, peer_id
1228                );
1229                Some(peer_id)
1230            }
1231            Err(_e) => {
1232                warn!(
1233                    "Rejecting message from {}: signature verification failed: {}",
1234                    _source, _e
1235                );
1236                return None;
1237            }
1238        }
1239    } else {
1240        None
1241    };
1242
1243    debug!(
1244        "Parsed P2PEvent::Message - topic: {}, source: {:?} (transport: {}, logical: {}), payload_len: {}",
1245        message.protocol,
1246        authenticated_node_id,
1247        _source,
1248        message.from,
1249        message.data.len()
1250    );
1251
1252    Some(ParsedMessage {
1253        event: P2PEvent::Message {
1254            topic: message.protocol,
1255            source: authenticated_node_id,
1256            data: message.data,
1257        },
1258        authenticated_node_id,
1259        user_agent: message.user_agent,
1260    })
1261}
1262
1263/// Verify the ML-DSA-65 signature on a WireMessage and return the authenticated [`PeerId`].
1264///
1265/// Besides verifying the cryptographic signature, this also checks that the
1266/// self-asserted `from` field matches the [`PeerId`] derived from the public
1267/// key. This prevents a sender from signing with their real key while
1268/// claiming a different identity in the `from` field.
1269fn verify_message_signature(message: &WireMessage) -> std::result::Result<PeerId, String> {
1270    let pubkey = MlDsaPublicKey::from_bytes(&message.public_key)
1271        .map_err(|e| format!("invalid public key: {e:?}"))?;
1272
1273    let peer_id = peer_id_from_public_key(&pubkey);
1274
1275    // Validate that the self-asserted `from` field matches the public key.
1276    if message.from != peer_id {
1277        return Err(format!(
1278            "from field mismatch: message claims '{}' but public key derives '{}'",
1279            message.from, peer_id
1280        ));
1281    }
1282
1283    let signable = postcard::to_stdvec(&(
1284        &message.protocol,
1285        &message.data as &[u8],
1286        &message.from,
1287        message.timestamp,
1288        &message.user_agent,
1289    ))
1290    .map_err(|e| format!("failed to serialize signable bytes: {e}"))?;
1291
1292    let sig = MlDsaSignature::from_bytes(&message.signature)
1293        .map_err(|e| format!("invalid signature: {e:?}"))?;
1294
1295    let valid = crate::quantum_crypto::ml_dsa_verify(&pubkey, &signable, &sig)
1296        .map_err(|e| format!("verification error: {e}"))?;
1297
1298    if valid {
1299        Ok(peer_id)
1300    } else {
1301        Err("signature is invalid".to_string())
1302    }
1303}
1304
1305impl P2PNode {
1306    /// Subscribe to network events
1307    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1308        self.transport.subscribe_events()
1309    }
1310
1311    /// Backwards-compat event stream accessor for tests
1312    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1313        self.subscribe_events()
1314    }
1315
1316    /// Get node uptime
1317    pub fn uptime(&self) -> Duration {
1318        self.start_time.elapsed()
1319    }
1320
1321    // MCP removed: all MCP tool/service methods removed
1322
1323    // /// Handle MCP remote tool call with network integration
1324
1325    // /// List tools available on a specific remote peer
1326
1327    // /// Get MCP server statistics
1328
1329    // Background tasks (connection_lifecycle_monitor, keepalive, periodic_maintenance)
1330    // are now implemented in TransportHandle.
1331
1332    /// Check system health
1333    pub async fn health_check(&self) -> Result<()> {
1334        let peer_count = self.peer_count().await;
1335        if peer_count > self.config.max_connections {
1336            Err(protocol_error(format!(
1337                "Too many connections: {peer_count}"
1338            )))
1339        } else {
1340            Ok(())
1341        }
1342    }
1343
1344    /// Get the attached DHT manager.
1345    pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
1346        self.adaptive_dht.dht_manager()
1347    }
1348
1349    /// Backwards-compatible alias for `dht_manager()`.
1350    pub fn dht(&self) -> &Arc<DhtNetworkManager> {
1351        self.dht_manager()
1352    }
1353
1354    /// Add a discovered peer to the bootstrap cache
1355    pub async fn add_discovered_peer(
1356        &self,
1357        _peer_id: PeerId,
1358        addresses: Vec<MultiAddr>,
1359    ) -> Result<()> {
1360        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1361            let manager = bootstrap_manager.read().await;
1362            let socket_addresses: Vec<std::net::SocketAddr> = addresses
1363                .iter()
1364                .filter_map(|addr| addr.socket_addr())
1365                .collect();
1366            if let Some(&primary) = socket_addresses.first() {
1367                manager
1368                    .add_peer(&primary, socket_addresses)
1369                    .await
1370                    .map_err(|e| {
1371                        protocol_error(format!("Failed to add peer to bootstrap cache: {e}"))
1372                    })?;
1373            }
1374        }
1375        Ok(())
1376    }
1377
1378    /// Update connection metrics for a peer in the bootstrap cache
1379    pub async fn update_peer_metrics(
1380        &self,
1381        addr: &MultiAddr,
1382        success: bool,
1383        latency_ms: Option<u64>,
1384        _error: Option<String>,
1385    ) -> Result<()> {
1386        if let Some(ref bootstrap_manager) = self.bootstrap_manager
1387            && let Some(sa) = addr.socket_addr()
1388        {
1389            let manager = bootstrap_manager.read().await;
1390            if success {
1391                let rtt_ms = latency_ms.unwrap_or(0) as u32;
1392                manager.record_success(&sa, rtt_ms).await;
1393            } else {
1394                manager.record_failure(&sa).await;
1395            }
1396        }
1397        Ok(())
1398    }
1399
1400    /// Get bootstrap cache statistics
1401    pub async fn get_bootstrap_cache_stats(
1402        &self,
1403    ) -> Result<Option<crate::bootstrap::BootstrapStats>> {
1404        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1405            let manager = bootstrap_manager.read().await;
1406            Ok(Some(manager.stats().await))
1407        } else {
1408            Ok(None)
1409        }
1410    }
1411
1412    /// Get the number of cached bootstrap peers
1413    pub async fn cached_peer_count(&self) -> usize {
1414        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1415            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1416        {
1417            return stats.total_peers;
1418        }
1419        0
1420    }
1421
1422    /// Connect to bootstrap peers and perform initial peer discovery
1423    async fn connect_bootstrap_peers(&self) -> Result<()> {
1424        // Each entry is a list of addresses for a single peer.
1425        let mut bootstrap_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
1426        let mut used_cache = false;
1427        let mut seen_addresses = std::collections::HashSet::new();
1428
1429        // Configured bootstrap peers take priority -- always include them first.
1430        if !self.config.bootstrap_peers.is_empty() {
1431            info!(
1432                "Using {} configured bootstrap peers (priority)",
1433                self.config.bootstrap_peers.len()
1434            );
1435            for multiaddr in &self.config.bootstrap_peers {
1436                let Some(socket_addr) = multiaddr.dialable_socket_addr() else {
1437                    warn!("Skipping non-QUIC bootstrap peer: {}", multiaddr);
1438                    continue;
1439                };
1440                seen_addresses.insert(socket_addr);
1441                bootstrap_addr_sets.push(vec![multiaddr.clone()]);
1442            }
1443        }
1444
1445        // Supplement with cached bootstrap peers (after CLI peers)
1446        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1447            let manager = bootstrap_manager.read().await;
1448            let cached_peers = manager.select_peers(BOOTSTRAP_PEER_BATCH_SIZE).await;
1449            if !cached_peers.is_empty() {
1450                let mut added_from_cache = 0;
1451                for cached in cached_peers {
1452                    let mut addrs = vec![cached.primary_address];
1453                    addrs.extend(cached.addresses);
1454                    // Only add addresses we haven't seen from CLI peers
1455                    let new_addresses: Vec<MultiAddr> = addrs
1456                        .into_iter()
1457                        .filter(|a| !seen_addresses.contains(a))
1458                        .map(MultiAddr::quic)
1459                        .collect();
1460
1461                    if !new_addresses.is_empty() {
1462                        for addr in &new_addresses {
1463                            if let Some(sa) = addr.socket_addr() {
1464                                seen_addresses.insert(sa);
1465                            }
1466                        }
1467                        bootstrap_addr_sets.push(new_addresses);
1468                        added_from_cache += 1;
1469                    }
1470                }
1471                if added_from_cache > 0 {
1472                    info!(
1473                        "Added {} cached bootstrap peers (supplementing CLI peers)",
1474                        added_from_cache
1475                    );
1476                    used_cache = true;
1477                }
1478            }
1479        }
1480
1481        if bootstrap_addr_sets.is_empty() {
1482            info!("No bootstrap peers configured and no cached peers available");
1483            return Ok(());
1484        }
1485
1486        // Connect to bootstrap peers, wait for identity exchange, then
1487        // perform DHT peer discovery using the real cryptographic PeerIds.
1488        let identity_timeout = Duration::from_secs(BOOTSTRAP_IDENTITY_TIMEOUT_SECS);
1489        let mut successful_connections = 0;
1490        let mut connected_peer_ids: Vec<PeerId> = Vec::new();
1491
1492        for addrs in &bootstrap_addr_sets {
1493            for addr in addrs {
1494                match self.connect_peer(addr).await {
1495                    Ok(channel_id) => {
1496                        // Wait for the remote peer's signed identity announce
1497                        // so we get a real cryptographic PeerId.
1498                        match self
1499                            .transport
1500                            .wait_for_peer_identity(&channel_id, identity_timeout)
1501                            .await
1502                        {
1503                            Ok(real_peer_id) => {
1504                                successful_connections += 1;
1505                                connected_peer_ids.push(real_peer_id);
1506
1507                                // Update bootstrap cache with successful connection
1508                                if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1509                                    let manager = bootstrap_manager.read().await;
1510                                    if let Some(sa) = addr.socket_addr() {
1511                                        manager.record_success(&sa, 100).await;
1512                                    }
1513                                }
1514                                break; // Successfully connected, move to next peer
1515                            }
1516                            Err(_e) => {
1517                                warn!(
1518                                    "Timeout waiting for identity from bootstrap peer {}: {}, \
1519                                     closing channel {}",
1520                                    addr, _e, channel_id
1521                                );
1522                                self.disconnect_channel(&channel_id).await;
1523                            }
1524                        }
1525                    }
1526                    Err(_e) => {
1527                        warn!("Failed to connect to bootstrap peer {}: {}", addr, _e);
1528
1529                        // Update bootstrap cache with failed connection
1530                        if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
1531                            let manager = bootstrap_manager.read().await;
1532                            if let Some(sa) = addr.socket_addr() {
1533                                manager.record_failure(&sa).await;
1534                            }
1535                        }
1536                    }
1537                }
1538            }
1539        }
1540
1541        if successful_connections == 0 {
1542            if !used_cache {
1543                warn!("Failed to connect to any bootstrap peers");
1544            }
1545            // Starting a node should not be gated on immediate bootstrap connectivity.
1546            // Keep running and allow background discovery / retries to populate peers later.
1547            return Ok(());
1548        }
1549
1550        info!(
1551            "Successfully connected to {} bootstrap peers",
1552            successful_connections
1553        );
1554
1555        // Perform DHT peer discovery from connected bootstrap peers.
1556        match self
1557            .dht_manager()
1558            .bootstrap_from_peers(&connected_peer_ids)
1559            .await
1560        {
1561            Ok(_count) => info!("DHT peer discovery found {} peers", _count),
1562            Err(_e) => warn!("DHT peer discovery failed: {}", _e),
1563        }
1564
1565        // Mark node as bootstrapped - we have connected to bootstrap peers
1566        // and initiated peer discovery
1567        self.is_bootstrapped.store(true, Ordering::SeqCst);
1568        info!(
1569            "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
1570            successful_connections,
1571            connected_peer_ids.len()
1572        );
1573
1574        Ok(())
1575    }
1576
1577    // disconnect_all_peers and periodic_tasks are now in TransportHandle
1578}
1579
1580/// Network sender trait for sending messages
1581#[async_trait::async_trait]
1582#[allow(dead_code)]
1583pub trait NetworkSender: Send + Sync {
1584    /// Send a message to an authenticated peer.
1585    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
1586
1587    /// Get our local peer ID (cryptographic identity).
1588    fn local_peer_id(&self) -> PeerId;
1589}
1590
1591// P2PNetworkSender removed — NetworkSender is now implemented directly on TransportHandle.
1592// NodeBuilder removed — use NodeConfigBuilder + P2PNode::new() instead.
1593
1594#[cfg(test)]
1595#[allow(clippy::unwrap_used, clippy::expect_used)]
1596mod diversity_tests {
1597    use super::*;
1598    use crate::security::IPDiversityConfig;
1599
1600    async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
1601        // Use a temp dir to avoid conflicts with cached files from old format
1602        let temp_dir = tempfile::TempDir::new().expect("temp dir");
1603        let mut bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
1604        bootstrap_config.cache_dir = temp_dir.path().to_path_buf();
1605
1606        BootstrapManager::with_node_config(bootstrap_config, config)
1607            .await
1608            .expect("bootstrap manager")
1609    }
1610
1611    #[tokio::test]
1612    async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
1613        let config = NodeConfig {
1614            diversity_config: Some(IPDiversityConfig::testnet()),
1615            ..Default::default()
1616        };
1617
1618        let manager = build_bootstrap_manager_like_prod(&config).await;
1619        assert!(manager.diversity_config().is_relaxed());
1620        assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
1621    }
1622}
1623
1624/// Helper function to register a new channel
1625pub(crate) async fn register_new_channel(
1626    peers: &Arc<RwLock<HashMap<String, PeerInfo>>>,
1627    channel_id: &str,
1628    remote_addr: &MultiAddr,
1629) {
1630    let mut peers_guard = peers.write().await;
1631    let peer_info = PeerInfo {
1632        channel_id: channel_id.to_owned(),
1633        addresses: vec![remote_addr.clone()],
1634        connected_at: tokio::time::Instant::now(),
1635        last_seen: tokio::time::Instant::now(),
1636        status: ConnectionStatus::Connected,
1637        protocols: vec!["p2p-core/1.0.0".to_string()],
1638        heartbeat_count: 0,
1639    };
1640    peers_guard.insert(channel_id.to_owned(), peer_info);
1641}
1642
1643#[cfg(test)]
1644mod tests {
1645    use super::*;
1646    // MCP removed from tests
1647    use std::time::Duration;
1648    use tokio::time::timeout;
1649
1650    /// 2 MiB — used in builder tests to verify max_message_size configuration.
1651    const TEST_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
1652
1653    // Test tool handler for network tests
1654
1655    // MCP removed
1656
1657    /// Helper function to create a test node configuration
1658    fn create_test_node_config() -> NodeConfig {
1659        NodeConfig {
1660            local: true,
1661            port: 0,
1662            ipv6: true,
1663            bootstrap_peers: vec![],
1664            connection_timeout: Duration::from_secs(2),
1665            max_connections: 100,
1666            dht_config: DHTConfig::default(),
1667            bootstrap_cache_config: None,
1668            diversity_config: None,
1669            max_message_size: None,
1670            node_identity: None,
1671            mode: NodeMode::default(),
1672            custom_user_agent: None,
1673            allow_loopback: true,
1674            adaptive_dht_config: AdaptiveDhtConfig::default(),
1675        }
1676    }
1677
1678    /// Helper function to create a test tool
1679    // MCP removed: test tool helper deleted
1680
1681    #[tokio::test]
1682    async fn test_node_config_default() {
1683        let config = NodeConfig::default();
1684
1685        assert_eq!(config.listen_addrs().len(), 2); // IPv4 + IPv6
1686        assert_eq!(config.max_connections, 10000);
1687        assert_eq!(config.connection_timeout, Duration::from_secs(30));
1688    }
1689
1690    #[tokio::test]
1691    async fn test_dht_config_default() {
1692        let config = DHTConfig::default();
1693
1694        assert_eq!(config.k_value, 20);
1695        assert_eq!(config.alpha_value, 5);
1696        assert_eq!(config.refresh_interval, Duration::from_secs(600));
1697    }
1698
1699    #[test]
1700    fn test_connection_status_variants() {
1701        let connecting = ConnectionStatus::Connecting;
1702        let connected = ConnectionStatus::Connected;
1703        let disconnecting = ConnectionStatus::Disconnecting;
1704        let disconnected = ConnectionStatus::Disconnected;
1705        let failed = ConnectionStatus::Failed("test error".to_string());
1706
1707        assert_eq!(connecting, ConnectionStatus::Connecting);
1708        assert_eq!(connected, ConnectionStatus::Connected);
1709        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
1710        assert_eq!(disconnected, ConnectionStatus::Disconnected);
1711        assert_ne!(connecting, connected);
1712
1713        if let ConnectionStatus::Failed(msg) = failed {
1714            assert_eq!(msg, "test error");
1715        } else {
1716            panic!("Expected Failed status");
1717        }
1718    }
1719
1720    #[tokio::test]
1721    async fn test_node_creation() -> Result<()> {
1722        let config = create_test_node_config();
1723        let node = P2PNode::new(config).await?;
1724
1725        // PeerId is derived from the cryptographic identity (32-byte BLAKE3 hash)
1726        assert_eq!(node.peer_id().to_hex().len(), 64);
1727        assert!(!node.is_running());
1728        assert_eq!(node.peer_count().await, 0);
1729        assert!(node.connected_peers().await.is_empty());
1730
1731        Ok(())
1732    }
1733
1734    #[tokio::test]
1735    async fn test_node_lifecycle() -> Result<()> {
1736        let config = create_test_node_config();
1737        let node = P2PNode::new(config).await?;
1738
1739        // Initially not running
1740        assert!(!node.is_running());
1741
1742        // Start the node
1743        node.start().await?;
1744        assert!(node.is_running());
1745
1746        // Check listen addresses were set (at least one)
1747        let listen_addrs = node.listen_addrs().await;
1748        assert!(
1749            !listen_addrs.is_empty(),
1750            "Expected at least one listening address"
1751        );
1752
1753        // Stop the node
1754        node.stop().await?;
1755        assert!(!node.is_running());
1756
1757        Ok(())
1758    }
1759
1760    #[tokio::test]
1761    async fn test_peer_connection() -> Result<()> {
1762        let config1 = create_test_node_config();
1763        let config2 = create_test_node_config();
1764
1765        let node1 = P2PNode::new(config1).await?;
1766        let node2 = P2PNode::new(config2).await?;
1767
1768        node1.start().await?;
1769        node2.start().await?;
1770
1771        let node2_addr = node2
1772            .listen_addrs()
1773            .await
1774            .into_iter()
1775            .find(|a| a.is_ipv4())
1776            .ok_or_else(|| {
1777                P2PError::Network(crate::error::NetworkError::InvalidAddress(
1778                    "Node 2 did not expose an IPv4 listen address".into(),
1779                ))
1780            })?;
1781
1782        // Connect to a real peer (unsigned — no node_identity configured).
1783        // connect_peer returns a transport-level channel ID (String), not a PeerId.
1784        let channel_id = node1.connect_peer(&node2_addr).await?;
1785
1786        // Unauthenticated connections don't appear in the app-level peer maps.
1787        // Verify transport-level tracking via is_connection_active / peers map.
1788        assert!(node1.is_connection_active(&channel_id).await);
1789
1790        // Get peer info from the transport-level peers map (keyed by channel ID)
1791        let peer_info = node1.transport.peer_info_by_channel(&channel_id).await;
1792        assert!(peer_info.is_some());
1793        let info = peer_info.expect("Peer info should exist after connect");
1794        assert_eq!(info.channel_id, channel_id);
1795        assert_eq!(info.status, ConnectionStatus::Connected);
1796        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
1797
1798        // Disconnect the channel
1799        node1.remove_channel(&channel_id).await;
1800        assert!(!node1.is_connection_active(&channel_id).await);
1801
1802        node1.stop().await?;
1803        node2.stop().await?;
1804
1805        Ok(())
1806    }
1807
1808    #[tokio::test]
1809    async fn test_connect_peer_rejects_tcp_multiaddr() -> Result<()> {
1810        let config = create_test_node_config();
1811        let node = P2PNode::new(config).await?;
1812
1813        let tcp_addr: MultiAddr = "/ip4/127.0.0.1/tcp/1".parse().unwrap();
1814        let result = node.connect_peer(&tcp_addr).await;
1815
1816        assert!(
1817            matches!(
1818                result,
1819                Err(P2PError::Network(
1820                    crate::error::NetworkError::InvalidAddress(_)
1821                ))
1822            ),
1823            "TCP multiaddrs should be rejected before a QUIC dial is attempted, got: {:?}",
1824            result
1825        );
1826
1827        Ok(())
1828    }
1829
1830    // TODO(windows): Investigate QUIC connection issues on Windows CI
1831    // This test consistently fails on Windows GitHub Actions runners with
1832    // "All connect attempts failed" even with IPv4-only config, long delays,
1833    // and multiple retry attempts. The underlying saorsa-transport library may have
1834    // issues on Windows that need investigation.
1835    // See: https://github.com/dirvine/saorsa-core/issues/TBD
1836    #[cfg_attr(target_os = "windows", ignore)]
1837    #[tokio::test]
1838    async fn test_event_subscription() -> Result<()> {
1839        // PeerConnected/PeerDisconnected only fire for authenticated peers
1840        // (nodes with node_identity that send signed messages).
1841        // Configure both nodes with identities so the event subscription test works.
1842        let identity1 =
1843            Arc::new(NodeIdentity::generate().expect("should generate identity for test node1"));
1844        let identity2 =
1845            Arc::new(NodeIdentity::generate().expect("should generate identity for test node2"));
1846
1847        let mut config1 = create_test_node_config();
1848        config1.ipv6 = false;
1849        config1.node_identity = Some(identity1);
1850
1851        let node2_peer_id = *identity2.peer_id();
1852        let mut config2 = create_test_node_config();
1853        config2.ipv6 = false;
1854        config2.node_identity = Some(identity2);
1855
1856        let node1 = P2PNode::new(config1).await?;
1857        let node2 = P2PNode::new(config2).await?;
1858
1859        node1.start().await?;
1860        node2.start().await?;
1861
1862        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1863
1864        // Subscribe to node2's events (node2 will receive the signed message)
1865        let mut events = node2.subscribe_events();
1866
1867        let node2_addr = node2.local_addr().ok_or_else(|| {
1868            P2PError::Network(crate::error::NetworkError::ProtocolError(
1869                "No listening address".to_string().into(),
1870            ))
1871        })?;
1872
1873        // Connect node1 → node2
1874        let mut channel_id = None;
1875        for attempt in 0..3 {
1876            if attempt > 0 {
1877                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1878            }
1879            match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
1880                Ok(Ok(id)) => {
1881                    channel_id = Some(id);
1882                    break;
1883                }
1884                Ok(Err(_)) | Err(_) => continue,
1885            }
1886        }
1887        let channel_id = channel_id.expect("Failed to connect after 3 attempts");
1888
1889        // Wait for identity exchange to complete via wait_for_peer_identity.
1890        let target_peer_id = node1
1891            .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
1892            .await?;
1893        assert_eq!(target_peer_id, node2_peer_id);
1894
1895        // node1 sends a signed message → node2 authenticates → PeerConnected fires on node2
1896        node1
1897            .send_message(&target_peer_id, "test-topic", b"hello".to_vec())
1898            .await?;
1899
1900        // Check for PeerConnected event on node2
1901        let event = timeout(Duration::from_secs(2), async {
1902            loop {
1903                match events.recv().await {
1904                    Ok(P2PEvent::PeerConnected(id, _)) => return Ok(id),
1905                    Ok(P2PEvent::Message { .. }) => continue, // skip messages
1906                    Ok(_) => continue,
1907                    Err(e) => return Err(e),
1908                }
1909            }
1910        })
1911        .await;
1912        assert!(event.is_ok(), "Should receive PeerConnected event");
1913        let connected_peer_id = event.expect("Timed out").expect("Channel error");
1914        // The connected peer ID should be node1's app-level ID (a valid PeerId)
1915        assert!(
1916            connected_peer_id.0.iter().any(|&b| b != 0),
1917            "PeerConnected should carry a non-zero peer ID"
1918        );
1919
1920        node1.stop().await?;
1921        node2.stop().await?;
1922
1923        Ok(())
1924    }
1925
1926    // TODO(windows): Same QUIC connection issues as test_event_subscription
1927    #[cfg_attr(target_os = "windows", ignore)]
1928    #[tokio::test]
1929    async fn test_message_sending() -> Result<()> {
1930        // Create two nodes (IPv4-only loopback)
1931        let mut config1 = create_test_node_config();
1932        config1.ipv6 = false;
1933        let node1 = P2PNode::new(config1).await?;
1934        node1.start().await?;
1935
1936        let mut config2 = create_test_node_config();
1937        config2.ipv6 = false;
1938        let node2 = P2PNode::new(config2).await?;
1939        node2.start().await?;
1940
1941        // Wait a bit for nodes to start listening
1942        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1943
1944        // Get actual listening address of node2
1945        let node2_addr = node2.local_addr().ok_or_else(|| {
1946            P2PError::Network(crate::error::NetworkError::ProtocolError(
1947                "No listening address".to_string().into(),
1948            ))
1949        })?;
1950
1951        // Connect node1 to node2
1952        let channel_id =
1953            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
1954                Ok(res) => res?,
1955                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
1956            };
1957
1958        // Wait for identity exchange via wait_for_peer_identity.
1959        let target_peer_id = node1
1960            .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
1961            .await?;
1962        assert_eq!(target_peer_id, node2.peer_id().clone());
1963
1964        // Send a message
1965        let message_data = b"Hello, peer!".to_vec();
1966        let result = match timeout(
1967            Duration::from_millis(500),
1968            node1.send_message(&target_peer_id, "test-protocol", message_data),
1969        )
1970        .await
1971        {
1972            Ok(res) => res,
1973            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
1974        };
1975        // For now, we'll just check that we don't get a "not connected" error
1976        // The actual send might fail due to no handler on the other side
1977        if let Err(e) = &result {
1978            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
1979        }
1980
1981        // Try to send to non-existent peer
1982        let non_existent_peer = PeerId::from_bytes([0xFFu8; 32]);
1983        let result = node1
1984            .send_message(&non_existent_peer, "test-protocol", vec![])
1985            .await;
1986        assert!(result.is_err(), "Sending to non-existent peer should fail");
1987
1988        node1.stop().await?;
1989        node2.stop().await?;
1990
1991        Ok(())
1992    }
1993
1994    #[tokio::test]
1995    async fn test_remote_mcp_operations() -> Result<()> {
1996        let config = create_test_node_config();
1997        let node = P2PNode::new(config).await?;
1998
1999        // MCP removed; test reduced to simple start/stop
2000        node.start().await?;
2001        node.stop().await?;
2002        Ok(())
2003    }
2004
2005    #[tokio::test]
2006    async fn test_health_check() -> Result<()> {
2007        let config = create_test_node_config();
2008        let node = P2PNode::new(config).await?;
2009
2010        // Health check should pass with no connections
2011        let result = node.health_check().await;
2012        assert!(result.is_ok());
2013
2014        // Note: We're not actually connecting to real peers here
2015        // since that would require running bootstrap nodes.
2016        // The health check should still pass with no connections.
2017
2018        Ok(())
2019    }
2020
2021    #[tokio::test]
2022    async fn test_node_uptime() -> Result<()> {
2023        let config = create_test_node_config();
2024        let node = P2PNode::new(config).await?;
2025
2026        let uptime1 = node.uptime();
2027        assert!(uptime1 >= Duration::from_secs(0));
2028
2029        // Wait a bit
2030        tokio::time::sleep(Duration::from_millis(10)).await;
2031
2032        let uptime2 = node.uptime();
2033        assert!(uptime2 > uptime1);
2034
2035        Ok(())
2036    }
2037
2038    #[tokio::test]
2039    async fn test_node_config_access() -> Result<()> {
2040        let config = create_test_node_config();
2041        let node = P2PNode::new(config).await?;
2042
2043        let node_config = node.config();
2044        assert_eq!(node_config.max_connections, 100);
2045        // MCP removed
2046
2047        Ok(())
2048    }
2049
2050    #[tokio::test]
2051    async fn test_mcp_server_access() -> Result<()> {
2052        let config = create_test_node_config();
2053        let _node = P2PNode::new(config).await?;
2054
2055        // MCP removed
2056        Ok(())
2057    }
2058
2059    #[tokio::test]
2060    async fn test_dht_access() -> Result<()> {
2061        let config = create_test_node_config();
2062        let node = P2PNode::new(config).await?;
2063
2064        // DHT is always available
2065        let _dht = node.dht();
2066
2067        Ok(())
2068    }
2069
2070    #[tokio::test]
2071    async fn test_node_config_builder() -> Result<()> {
2072        let bootstrap: MultiAddr = "/ip4/127.0.0.1/udp/9000/quic".parse().unwrap();
2073
2074        let config = NodeConfig::builder()
2075            .local(true)
2076            .ipv6(true)
2077            .bootstrap_peer(bootstrap)
2078            .connection_timeout(Duration::from_secs(15))
2079            .max_connections(200)
2080            .max_message_size(TEST_MAX_MESSAGE_SIZE)
2081            .build()?;
2082
2083        assert_eq!(config.listen_addrs().len(), 2); // IPv4 + IPv6
2084        assert!(config.local);
2085        assert!(config.ipv6);
2086        assert_eq!(config.bootstrap_peers.len(), 1);
2087        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2088        assert_eq!(config.max_connections, 200);
2089        assert_eq!(config.max_message_size, Some(TEST_MAX_MESSAGE_SIZE));
2090        assert!(config.allow_loopback); // auto-enabled by local(true)
2091
2092        Ok(())
2093    }
2094
2095    #[tokio::test]
2096    async fn test_bootstrap_peers() -> Result<()> {
2097        let mut config = create_test_node_config();
2098        config.bootstrap_peers = vec![
2099            crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9200),
2100            crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9201),
2101        ];
2102
2103        let node = P2PNode::new(config).await?;
2104
2105        // Start node (which attempts to connect to bootstrap peers)
2106        node.start().await?;
2107
2108        // In a test environment, bootstrap peers may not be available
2109        // The test verifies the node starts correctly with bootstrap configuration
2110        // Peer count may include local/internal tracking, so we just verify it's reasonable
2111        let _peer_count = node.peer_count().await;
2112
2113        node.stop().await?;
2114        Ok(())
2115    }
2116
2117    #[tokio::test]
2118    async fn test_peer_info_structure() {
2119        let peer_info = PeerInfo {
2120            channel_id: "test_peer".to_string(),
2121            addresses: vec!["/ip4/127.0.0.1/tcp/9000".parse::<MultiAddr>().unwrap()],
2122            connected_at: Instant::now(),
2123            last_seen: Instant::now(),
2124            status: ConnectionStatus::Connected,
2125            protocols: vec!["test-protocol".to_string()],
2126            heartbeat_count: 0,
2127        };
2128
2129        assert_eq!(peer_info.channel_id, "test_peer");
2130        assert_eq!(peer_info.addresses.len(), 1);
2131        assert_eq!(peer_info.status, ConnectionStatus::Connected);
2132        assert_eq!(peer_info.protocols.len(), 1);
2133    }
2134
2135    #[tokio::test]
2136    async fn test_serialization() -> Result<()> {
2137        // Test that configs can be serialized/deserialized
2138        let config = create_test_node_config();
2139        let serialized = serde_json::to_string(&config)?;
2140        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2141
2142        assert_eq!(config.local, deserialized.local);
2143        assert_eq!(config.port, deserialized.port);
2144        assert_eq!(config.ipv6, deserialized.ipv6);
2145        assert_eq!(config.bootstrap_peers, deserialized.bootstrap_peers);
2146
2147        Ok(())
2148    }
2149
2150    #[tokio::test]
2151    async fn test_get_channel_id_by_address_found() -> Result<()> {
2152        let config = create_test_node_config();
2153        let node = P2PNode::new(config).await?;
2154
2155        // Manually insert a peer for testing
2156        let test_channel_id = "peer_test_123".to_string();
2157        let test_address = "192.168.1.100:9000";
2158        let test_multiaddr = MultiAddr::quic(test_address.parse().unwrap());
2159
2160        let peer_info = PeerInfo {
2161            channel_id: test_channel_id.clone(),
2162            addresses: vec![test_multiaddr],
2163            connected_at: Instant::now(),
2164            last_seen: Instant::now(),
2165            status: ConnectionStatus::Connected,
2166            protocols: vec!["test-protocol".to_string()],
2167            heartbeat_count: 0,
2168        };
2169
2170        node.transport
2171            .inject_peer(test_channel_id.clone(), peer_info)
2172            .await;
2173
2174        // Test: Find channel by address
2175        let lookup_addr = MultiAddr::quic(test_address.parse().unwrap());
2176        let found_channel_id = node.get_channel_id_by_address(&lookup_addr).await;
2177        assert_eq!(found_channel_id, Some(test_channel_id));
2178
2179        Ok(())
2180    }
2181
2182    #[tokio::test]
2183    async fn test_get_channel_id_by_address_not_found() -> Result<()> {
2184        let config = create_test_node_config();
2185        let node = P2PNode::new(config).await?;
2186
2187        // Test: Try to find a channel that doesn't exist
2188        let unknown_addr = MultiAddr::quic("192.168.1.200:9000".parse().unwrap());
2189        let result = node.get_channel_id_by_address(&unknown_addr).await;
2190        assert_eq!(result, None);
2191
2192        Ok(())
2193    }
2194
2195    #[tokio::test]
2196    async fn test_get_channel_id_by_address_invalid_format() -> Result<()> {
2197        let config = create_test_node_config();
2198        let node = P2PNode::new(config).await?;
2199
2200        // Test: Non-IP address should return None (no matching socket addr)
2201        let ble_addr = MultiAddr::new(crate::address::TransportAddr::Ble {
2202            mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
2203            psm: 0x0025,
2204        });
2205        let result = node.get_channel_id_by_address(&ble_addr).await;
2206        assert_eq!(result, None);
2207
2208        Ok(())
2209    }
2210
2211    #[tokio::test]
2212    async fn test_get_channel_id_by_address_multiple_peers() -> Result<()> {
2213        let config = create_test_node_config();
2214        let node = P2PNode::new(config).await?;
2215
2216        // Add multiple peers with different addresses
2217        let peer1_id = "peer_1".to_string();
2218        let peer1_addr_str = "192.168.1.101:9001";
2219        let peer1_multiaddr = MultiAddr::quic(peer1_addr_str.parse().unwrap());
2220
2221        let peer2_id = "peer_2".to_string();
2222        let peer2_addr_str = "192.168.1.102:9002";
2223        let peer2_multiaddr = MultiAddr::quic(peer2_addr_str.parse().unwrap());
2224
2225        let peer1_info = PeerInfo {
2226            channel_id: peer1_id.clone(),
2227            addresses: vec![peer1_multiaddr],
2228            connected_at: Instant::now(),
2229            last_seen: Instant::now(),
2230            status: ConnectionStatus::Connected,
2231            protocols: vec!["test-protocol".to_string()],
2232            heartbeat_count: 0,
2233        };
2234
2235        let peer2_info = PeerInfo {
2236            channel_id: peer2_id.clone(),
2237            addresses: vec![peer2_multiaddr],
2238            connected_at: Instant::now(),
2239            last_seen: Instant::now(),
2240            status: ConnectionStatus::Connected,
2241            protocols: vec!["test-protocol".to_string()],
2242            heartbeat_count: 0,
2243        };
2244
2245        node.transport
2246            .inject_peer(peer1_id.clone(), peer1_info)
2247            .await;
2248        node.transport
2249            .inject_peer(peer2_id.clone(), peer2_info)
2250            .await;
2251
2252        // Test: Find each channel by their unique address
2253        let found_peer1 = node
2254            .get_channel_id_by_address(&MultiAddr::quic(peer1_addr_str.parse().unwrap()))
2255            .await;
2256        let found_peer2 = node
2257            .get_channel_id_by_address(&MultiAddr::quic(peer2_addr_str.parse().unwrap()))
2258            .await;
2259
2260        assert_eq!(found_peer1, Some(peer1_id));
2261        assert_eq!(found_peer2, Some(peer2_id));
2262
2263        Ok(())
2264    }
2265
2266    #[tokio::test]
2267    async fn test_list_active_connections_empty() -> Result<()> {
2268        let config = create_test_node_config();
2269        let node = P2PNode::new(config).await?;
2270
2271        // Test: No connections initially
2272        let connections = node.list_active_connections().await;
2273        assert!(connections.is_empty());
2274
2275        Ok(())
2276    }
2277
2278    #[tokio::test]
2279    async fn test_list_active_connections_with_peers() -> Result<()> {
2280        let config = create_test_node_config();
2281        let node = P2PNode::new(config).await?;
2282
2283        // Add multiple peers
2284        let peer1_id = "peer_1".to_string();
2285        let peer1_addrs = vec![
2286            MultiAddr::quic("192.168.1.101:9001".parse().unwrap()),
2287            MultiAddr::quic("192.168.1.101:9002".parse().unwrap()),
2288        ];
2289
2290        let peer2_id = "peer_2".to_string();
2291        let peer2_addrs = vec![MultiAddr::quic("192.168.1.102:9003".parse().unwrap())];
2292
2293        let peer1_info = PeerInfo {
2294            channel_id: peer1_id.clone(),
2295            addresses: peer1_addrs.clone(),
2296            connected_at: Instant::now(),
2297            last_seen: Instant::now(),
2298            status: ConnectionStatus::Connected,
2299            protocols: vec!["test-protocol".to_string()],
2300            heartbeat_count: 0,
2301        };
2302
2303        let peer2_info = PeerInfo {
2304            channel_id: peer2_id.clone(),
2305            addresses: peer2_addrs.clone(),
2306            connected_at: Instant::now(),
2307            last_seen: Instant::now(),
2308            status: ConnectionStatus::Connected,
2309            protocols: vec!["test-protocol".to_string()],
2310            heartbeat_count: 0,
2311        };
2312
2313        node.transport
2314            .inject_peer(peer1_id.clone(), peer1_info)
2315            .await;
2316        node.transport
2317            .inject_peer(peer2_id.clone(), peer2_info)
2318            .await;
2319
2320        // Also add to active_connections (list_active_connections iterates over this)
2321        node.transport
2322            .inject_active_connection(peer1_id.clone())
2323            .await;
2324        node.transport
2325            .inject_active_connection(peer2_id.clone())
2326            .await;
2327
2328        // Test: List all active connections
2329        let connections = node.list_active_connections().await;
2330        assert_eq!(connections.len(), 2);
2331
2332        // Verify peer1 and peer2 are in the list
2333        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
2334        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
2335
2336        assert!(peer1_conn.is_some());
2337        assert!(peer2_conn.is_some());
2338
2339        // Verify addresses match
2340        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
2341        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
2342
2343        Ok(())
2344    }
2345
2346    #[tokio::test]
2347    async fn test_remove_channel_success() -> Result<()> {
2348        let config = create_test_node_config();
2349        let node = P2PNode::new(config).await?;
2350
2351        // Add a peer
2352        let channel_id = "peer_to_remove".to_string();
2353        let channel_peer_id = PeerId::from_name(&channel_id);
2354        let peer_info = PeerInfo {
2355            channel_id: channel_id.clone(),
2356            addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
2357            connected_at: Instant::now(),
2358            last_seen: Instant::now(),
2359            status: ConnectionStatus::Connected,
2360            protocols: vec!["test-protocol".to_string()],
2361            heartbeat_count: 0,
2362        };
2363
2364        node.transport
2365            .inject_peer(channel_id.clone(), peer_info)
2366            .await;
2367        node.transport
2368            .inject_peer_to_channel(channel_peer_id, channel_id.clone())
2369            .await;
2370
2371        // Verify peer exists
2372        assert!(node.is_peer_connected(&channel_peer_id).await);
2373
2374        // Remove the channel
2375        let removed = node.remove_channel(&channel_id).await;
2376        assert!(removed);
2377
2378        // Verify peer no longer exists
2379        assert!(!node.is_peer_connected(&channel_peer_id).await);
2380
2381        Ok(())
2382    }
2383
2384    #[tokio::test]
2385    async fn test_remove_channel_nonexistent() -> Result<()> {
2386        let config = create_test_node_config();
2387        let node = P2PNode::new(config).await?;
2388
2389        // Try to remove a channel that doesn't exist
2390        let removed = node.remove_channel("nonexistent_peer").await;
2391        assert!(!removed);
2392
2393        Ok(())
2394    }
2395
2396    #[tokio::test]
2397    async fn test_is_peer_connected() -> Result<()> {
2398        let config = create_test_node_config();
2399        let node = P2PNode::new(config).await?;
2400
2401        let channel_id = "test_peer".to_string();
2402        let channel_peer_id = PeerId::from_name(&channel_id);
2403
2404        // Initially not connected
2405        assert!(!node.is_peer_connected(&channel_peer_id).await);
2406
2407        // Add peer
2408        let peer_info = PeerInfo {
2409            channel_id: channel_id.clone(),
2410            addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
2411            connected_at: Instant::now(),
2412            last_seen: Instant::now(),
2413            status: ConnectionStatus::Connected,
2414            protocols: vec!["test-protocol".to_string()],
2415            heartbeat_count: 0,
2416        };
2417
2418        node.transport
2419            .inject_peer(channel_id.clone(), peer_info)
2420            .await;
2421        node.transport
2422            .inject_peer_to_channel(channel_peer_id, channel_id.clone())
2423            .await;
2424
2425        // Now connected
2426        assert!(node.is_peer_connected(&channel_peer_id).await);
2427
2428        // Remove channel
2429        node.remove_channel(&channel_id).await;
2430
2431        // No longer connected
2432        assert!(!node.is_peer_connected(&channel_peer_id).await);
2433
2434        Ok(())
2435    }
2436
2437    #[test]
2438    fn test_normalize_ipv6_wildcard() {
2439        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
2440
2441        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
2442        let normalized = normalize_wildcard_to_loopback(wildcard);
2443
2444        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
2445        assert_eq!(normalized.port(), 8080);
2446    }
2447
2448    #[test]
2449    fn test_normalize_ipv4_wildcard() {
2450        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2451
2452        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
2453        let normalized = normalize_wildcard_to_loopback(wildcard);
2454
2455        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
2456        assert_eq!(normalized.port(), 9000);
2457    }
2458
2459    #[test]
2460    fn test_normalize_specific_address_unchanged() {
2461        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
2462        let normalized = normalize_wildcard_to_loopback(specific);
2463
2464        assert_eq!(normalized, specific);
2465    }
2466
2467    #[test]
2468    fn test_normalize_loopback_unchanged() {
2469        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
2470
2471        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
2472        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
2473        assert_eq!(normalized_v6, loopback_v6);
2474
2475        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
2476        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
2477        assert_eq!(normalized_v4, loopback_v4);
2478    }
2479
2480    // ---- parse_protocol_message regression tests ----
2481
2482    /// Get current Unix timestamp for tests
2483    fn current_timestamp() -> u64 {
2484        std::time::SystemTime::now()
2485            .duration_since(std::time::UNIX_EPOCH)
2486            .map(|d| d.as_secs())
2487            .unwrap_or(0)
2488    }
2489
2490    /// Helper to create a postcard-serialized WireMessage for tests
2491    fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
2492        let msg = WireMessage {
2493            protocol: protocol.to_string(),
2494            data,
2495            from: PeerId::from_name(from),
2496            timestamp,
2497            user_agent: String::new(),
2498            public_key: Vec::new(),
2499            signature: Vec::new(),
2500        };
2501        postcard::to_stdvec(&msg).unwrap()
2502    }
2503
2504    #[test]
2505    fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
2506        // Regression: For unsigned messages, P2PEvent::Message.source must be the
2507        // transport peer ID, NOT the "from" field from the wire message.
2508        let transport_id = "abcdef0123456789";
2509        let logical_id = "spoofed-logical-id";
2510        let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
2511
2512        let parsed =
2513            parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
2514
2515        // Unsigned message: no authenticated node ID
2516        assert!(parsed.authenticated_node_id.is_none());
2517
2518        match parsed.event {
2519            P2PEvent::Message {
2520                topic,
2521                source,
2522                data,
2523            } => {
2524                assert!(source.is_none(), "unsigned message source must be None");
2525                assert_eq!(topic, "test/v1");
2526                assert_eq!(data, vec![1u8, 2, 3]);
2527            }
2528            other => panic!("expected P2PEvent::Message, got {:?}", other),
2529        }
2530    }
2531
2532    #[test]
2533    fn test_parse_protocol_message_rejects_invalid_bytes() {
2534        // Random bytes that are not valid bincode should be rejected
2535        assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
2536    }
2537
2538    #[test]
2539    fn test_parse_protocol_message_rejects_truncated_message() {
2540        // A truncated bincode message should fail to deserialize
2541        let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
2542        let truncated = &full_bytes[..full_bytes.len() / 2];
2543        assert!(parse_protocol_message(truncated, "peer-id").is_none());
2544    }
2545
2546    #[test]
2547    fn test_parse_protocol_message_empty_payload() {
2548        let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
2549
2550        let parsed = parse_protocol_message(&bytes, "transport-peer")
2551            .expect("valid message with empty data should parse");
2552
2553        match parsed.event {
2554            P2PEvent::Message { data, .. } => assert!(data.is_empty()),
2555            other => panic!("expected P2PEvent::Message, got {:?}", other),
2556        }
2557    }
2558
2559    #[test]
2560    fn test_parse_protocol_message_preserves_binary_payload() {
2561        // Verify that arbitrary byte values (including 0xFF, 0x00) survive round-trip
2562        let payload: Vec<u8> = (0..=255).collect();
2563        let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
2564
2565        let parsed = parse_protocol_message(&bytes, "peer-id")
2566            .expect("valid message with full byte range should parse");
2567
2568        match parsed.event {
2569            P2PEvent::Message { data, topic, .. } => {
2570                assert_eq!(topic, "binary/v1");
2571                assert_eq!(
2572                    data, payload,
2573                    "payload must survive bincode round-trip exactly"
2574                );
2575            }
2576            other => panic!("expected P2PEvent::Message, got {:?}", other),
2577        }
2578    }
2579
2580    #[test]
2581    fn test_parse_signed_message_verifies_and_uses_node_id() {
2582        let identity = NodeIdentity::generate().expect("should generate identity");
2583        let protocol = "test/signed";
2584        let data: Vec<u8> = vec![10, 20, 30];
2585        // The `from` field must match the PeerId derived from the public key.
2586        let from = *identity.peer_id();
2587        let timestamp = current_timestamp();
2588        let user_agent = "test/1.0";
2589
2590        // Compute signable bytes the same way create_protocol_message does
2591        let signable =
2592            postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
2593                .unwrap();
2594        let sig = identity.sign(&signable).expect("signing should succeed");
2595
2596        let msg = WireMessage {
2597            protocol: protocol.to_string(),
2598            data: data.clone(),
2599            from,
2600            timestamp,
2601            user_agent: user_agent.to_string(),
2602            public_key: identity.public_key().as_bytes().to_vec(),
2603            signature: sig.as_bytes().to_vec(),
2604        };
2605        let bytes = postcard::to_stdvec(&msg).unwrap();
2606
2607        let parsed =
2608            parse_protocol_message(&bytes, "transport-xyz").expect("signed message should parse");
2609
2610        let expected_peer_id = *identity.peer_id();
2611        assert_eq!(
2612            parsed.authenticated_node_id.as_ref(),
2613            Some(&expected_peer_id)
2614        );
2615
2616        match parsed.event {
2617            P2PEvent::Message { source, .. } => {
2618                assert_eq!(
2619                    source.as_ref(),
2620                    Some(&expected_peer_id),
2621                    "source should be the verified PeerId"
2622                );
2623            }
2624            other => panic!("expected P2PEvent::Message, got {:?}", other),
2625        }
2626    }
2627
2628    #[test]
2629    fn test_parse_message_with_bad_signature_is_rejected() {
2630        let identity = NodeIdentity::generate().expect("should generate identity");
2631        let protocol = "test/bad-sig";
2632        let data: Vec<u8> = vec![1, 2, 3];
2633        let from = *identity.peer_id();
2634        let timestamp = current_timestamp();
2635        let user_agent = "test/1.0";
2636
2637        // Sign correct signable bytes
2638        let signable =
2639            postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
2640                .unwrap();
2641        let sig = identity.sign(&signable).expect("signing should succeed");
2642
2643        // Tamper with the data (signature was over [1,2,3], not [99,99,99])
2644        let msg = WireMessage {
2645            protocol: protocol.to_string(),
2646            data: vec![99, 99, 99],
2647            from,
2648            timestamp,
2649            user_agent: user_agent.to_string(),
2650            public_key: identity.public_key().as_bytes().to_vec(),
2651            signature: sig.as_bytes().to_vec(),
2652        };
2653        let bytes = postcard::to_stdvec(&msg).unwrap();
2654
2655        assert!(
2656            parse_protocol_message(&bytes, "transport-xyz").is_none(),
2657            "message with bad signature should be rejected"
2658        );
2659    }
2660
2661    #[test]
2662    fn test_parse_message_with_mismatched_from_is_rejected() {
2663        let identity = NodeIdentity::generate().expect("should generate identity");
2664        let protocol = "test/from-mismatch";
2665        let data: Vec<u8> = vec![1, 2, 3];
2666        // Use a `from` field that does NOT match the public key's PeerId.
2667        let fake_from = PeerId::from_bytes([0xDE; 32]);
2668        let timestamp = current_timestamp();
2669        let user_agent = "test/1.0";
2670
2671        let signable =
2672            postcard::to_stdvec(&(protocol, data.as_slice(), &fake_from, timestamp, user_agent))
2673                .unwrap();
2674        let sig = identity.sign(&signable).expect("signing should succeed");
2675
2676        let msg = WireMessage {
2677            protocol: protocol.to_string(),
2678            data,
2679            from: fake_from,
2680            timestamp,
2681            user_agent: user_agent.to_string(),
2682            public_key: identity.public_key().as_bytes().to_vec(),
2683            signature: sig.as_bytes().to_vec(),
2684        };
2685        let bytes = postcard::to_stdvec(&msg).unwrap();
2686
2687        assert!(
2688            parse_protocol_message(&bytes, "transport-xyz").is_none(),
2689            "message with mismatched from field should be rejected"
2690        );
2691    }
2692}