Skip to main content

saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::PeerId;
20use crate::adaptive::trust::{TrustRecord, TrustSnapshot};
21use crate::adaptive::{AdaptiveDHT, AdaptiveDhtConfig, TrustEngine, TrustEvent};
22use crate::bootstrap::cache::{CachedCloseGroupPeer, CloseGroupCache};
23use crate::dht::core_engine::AddressType;
24use crate::dht_network_manager::{
25    DhtNetworkConfig, DhtNetworkEvent, DhtNetworkManager, IDENTITY_EXCHANGE_TIMEOUT,
26};
27use crate::error::{IdentityError, NetworkError, P2PError, P2pResult as Result};
28use crate::reachability::spawn_acquisition_driver;
29
30use crate::MultiAddr;
31use crate::identity::node_identity::{NodeIdentity, peer_id_from_public_key};
32use crate::quantum_crypto::saorsa_transport_integration::{MlDsaPublicKey, MlDsaSignature};
33use dashmap::DashMap;
34use futures::StreamExt;
35use parking_lot::Mutex as ParkingMutex;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::net::SocketAddr;
39use std::path::{Path, PathBuf};
40use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, Ordering};
42use std::time::{Duration, SystemTime, UNIX_EPOCH};
43use tokio::sync::{Mutex as TokioMutex, RwLock, broadcast};
44use tokio::time::Instant;
45use tokio_util::sync::CancellationToken;
46use tracing::{debug, info, trace, warn};
47
48/// Wire protocol message format for P2P communication.
49///
50/// Serialized with postcard for compact binary encoding.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub(crate) struct WireMessage {
53    /// Protocol/topic identifier
54    pub(crate) protocol: String,
55    /// Raw payload bytes
56    pub(crate) data: Vec<u8>,
57    /// Sender's peer ID (verified against transport-level identity)
58    pub(crate) from: PeerId,
59    /// Unix timestamp in seconds
60    pub(crate) timestamp: u64,
61    /// User agent string identifying the sender's software.
62    ///
63    /// Convention: `"node/<version>"` for full DHT participants,
64    /// `"client/<version>"` or `"<app>/<version>"` for ephemeral clients.
65    /// Included in the signed bytes — tamper-proof.
66    #[serde(default)]
67    pub(crate) user_agent: String,
68    /// Sender's ML-DSA-65 public key (1952 bytes). Empty if unsigned.
69    #[serde(default)]
70    pub(crate) public_key: Vec<u8>,
71    /// ML-DSA-65 signature over the signable bytes. Empty if unsigned.
72    #[serde(default)]
73    pub(crate) signature: Vec<u8>,
74}
75
76/// Operating mode of a P2P node.
77///
78/// Determines the default user agent and DHT participation behavior.
79/// `Node` peers participate in the DHT routing table; `Client` peers
80/// are treated as ephemeral and excluded from routing.
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
82pub enum NodeMode {
83    /// Full DHT-participant node that maintains routing state and routes messages.
84    #[default]
85    Node,
86    /// Ephemeral client that connects to perform operations without joining the DHT.
87    Client,
88}
89
90/// Internal listen mode controlling which network interfaces the node binds to.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92enum ListenMode {
93    /// Bind to all interfaces (`0.0.0.0` / `::`).
94    Public,
95    /// Bind to loopback only (`127.0.0.1` / `::1`).
96    Local,
97}
98
99/// Returns the default user agent string for the given mode.
100///
101/// - `Node` → `"node/<saorsa-core-version>"`
102/// - `Client` → `"client/<saorsa-core-version>"`
103pub fn user_agent_for_mode(mode: NodeMode) -> String {
104    let prefix = match mode {
105        NodeMode::Node => "node",
106        NodeMode::Client => "client",
107    };
108    format!("{prefix}/{}", env!("CARGO_PKG_VERSION"))
109}
110
111/// Returns `true` if the user agent identifies a full DHT participant (prefix `"node/"`).
112pub fn is_dht_participant(user_agent: &str) -> bool {
113    user_agent.starts_with("node/")
114}
115
116/// Capacity of the internal channel used by the message receiving system.
117pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
118
119/// Maximum number of concurrent in-flight request/response operations.
120pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
121
122/// Maximum allowed timeout for a single request (5 minutes).
123pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
124
125/// Default listen port for the P2P node.
126const DEFAULT_LISTEN_PORT: u16 = 9000;
127
128/// Default maximum number of concurrent connections.
129const DEFAULT_MAX_CONNECTIONS: usize = 10_000;
130
131/// Default connection timeout in seconds.
132///
133/// The transport adapter keeps each direct Happy Eyeballs attempt short so
134/// DHT lookups can move past offline peers quickly. 25s leaves room for
135/// multi-stage connection strategies and identity exchange while preserving
136/// the historical API default.
137const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 25;
138
139/// Timeout in seconds for waiting on a bootstrap peer's identity exchange.
140///
141/// Tighter than the post-bootstrap budget (`IDENTITY_EXCHANGE_TIMEOUT`,
142/// 5 s) on purpose: bootstrap candidates are unverified and a stuck one
143/// must not be allowed to head-of-line block convergence. 3 s covers
144/// loopback (<100 ms) and direct WAN paths (~1–2 s with one handshake
145/// retry); a relay-tunnelled path with congested ML-DSA verification
146/// can exceed this and will fail identity exchange, but bootstrap simply
147/// moves on to other candidates rather than retrying the same one.
148///
149/// `wait_for_peer_identity` short-circuits on channel close, so most dead
150/// channels surface in microseconds regardless of this budget.
151const BOOTSTRAP_IDENTITY_TIMEOUT_SECS: u64 = 3;
152
153/// Maximum number of bootstrap peers dialed concurrently in Phase B.
154///
155/// Bounds the fan-out of configured bootstrap dials so simultaneous QUIC+PQC
156/// handshakes don't spike CPU or saturate the UDP socket. Chosen
157/// low on purpose: each dial runs a full ML-KEM key exchange and ML-DSA
158/// verification, and a cold-start node has no spare compute budget.
159const MAX_CONCURRENT_BOOTSTRAP_DIALS: usize = 4;
160
161/// Number of successful bootstrap connections after which a client-mode
162/// node stops dialing further candidates.
163///
164/// Clients only need enough peers to route their own lookups (α=3 parallel
165/// queries → 6 gives ~2× redundancy) and don't serve the DHT, so a fully
166/// populated close-group buys them nothing. Stopping early cuts cold-start
167/// latency by skipping the tail of slow / dead candidates. Nodes always
168/// dial every candidate so their routing table converges fully.
169const CLIENT_BOOTSTRAP_TARGET: usize = 6;
170
171/// Serde helper — returns `true`.
172const fn default_true() -> bool {
173    true
174}
175
176/// Configuration for a P2P node
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct NodeConfig {
179    /// Bind to loopback only (`127.0.0.1` / `::1`).
180    ///
181    /// When `true`, the node listens on loopback addresses suitable for
182    /// local development and testing. When `false` (the default), the node
183    /// listens on all interfaces (`0.0.0.0` / `::`).
184    #[serde(default)]
185    pub local: bool,
186
187    /// Listen port. `0` means OS-assigned ephemeral port.
188    #[serde(default)]
189    pub port: u16,
190
191    /// Enable IPv6 dual-stack binding.
192    ///
193    /// When `true` (the default), both an IPv4 and an IPv6 address are
194    /// bound. When `false`, only IPv4 is used.
195    #[serde(default = "default_true")]
196    pub ipv6: bool,
197
198    /// Bootstrap peers to connect to on startup.
199    pub bootstrap_peers: Vec<crate::MultiAddr>,
200
201    // MCP removed; will be redesigned later
202    /// Connection timeout duration
203    pub connection_timeout: Duration,
204
205    /// Maximum number of concurrent connections
206    pub max_connections: usize,
207
208    /// DHT configuration
209    pub dht_config: DHTConfig,
210
211    /// Optional IP diversity configuration for Sybil protection tuning.
212    ///
213    /// When set, this configuration is used by diversity-enforcing subsystems.
214    /// If `None`, defaults are used.
215    pub diversity_config: Option<crate::security::IPDiversityConfig>,
216
217    /// Optional override for the maximum application-layer message size.
218    ///
219    /// When `None`, the underlying saorsa-transport default is used.
220    #[serde(default)]
221    pub max_message_size: Option<usize>,
222
223    /// Optional node identity for app-level message signing.
224    ///
225    /// When set, outgoing messages are signed with the node's ML-DSA-65 key
226    /// and incoming signed messages are verified at the transport layer.
227    #[serde(skip)]
228    pub node_identity: Option<Arc<NodeIdentity>>,
229
230    /// Operating mode of this node.
231    ///
232    /// Determines the default user agent and DHT participation:
233    /// - `Node` → user agent `"node/<version>"`, added to DHT routing tables.
234    /// - `Client` → user agent `"client/<version>"`, treated as ephemeral.
235    #[serde(default)]
236    pub mode: NodeMode,
237
238    /// Optional custom user agent override.
239    ///
240    /// When `Some`, this value is used instead of the mode-derived default.
241    /// When `None`, the user agent is derived from [`NodeConfig::mode`].
242    #[serde(default, skip_serializing_if = "Option::is_none")]
243    pub custom_user_agent: Option<String>,
244
245    /// Allow loopback addresses (127.0.0.1, ::1) in the transport layer.
246    ///
247    /// In production, loopback addresses are rejected because they are not
248    /// routable. Enable this for local devnets and testnets where all nodes
249    /// run on the same machine.
250    ///
251    /// Default: `false`
252    #[serde(default)]
253    pub allow_loopback: bool,
254
255    /// Adaptive DHT configuration (trust-based swap-out).
256    ///
257    /// Controls whether peers with low trust scores are eligible for
258    /// swap-out from the routing table when better candidates arrive. Use
259    /// `NodeConfigBuilder::trust_enforcement` for a simple on/off toggle.
260    ///
261    /// Default: enabled with a swap threshold of 0.35.
262    #[serde(default)]
263    pub adaptive_dht_config: AdaptiveDhtConfig,
264
265    /// Optional path for persisting the close group cache.
266    ///
267    /// Directory for persisting the close group cache.
268    ///
269    /// When set, the node saves its close group peers and their trust
270    /// scores to `{dir}/close_group_cache.json` on shutdown and after
271    /// bootstrap. On startup, cached peers are loaded and contacted
272    /// first, preserving close group consistency across restarts.
273    ///
274    /// When `None`, no close group cache is used.
275    #[serde(default, skip_serializing_if = "Option::is_none")]
276    pub close_group_cache_dir: Option<PathBuf>,
277}
278
279/// DHT-specific configuration
280#[derive(Debug, Clone, Serialize, Deserialize)]
281pub struct DHTConfig {
282    /// Kademlia K parameter (bucket size)
283    pub k_value: usize,
284
285    /// Kademlia alpha parameter (parallelism)
286    pub alpha_value: usize,
287
288    /// DHT refresh interval
289    pub refresh_interval: Duration,
290}
291
292// ============================================================================
293// Address Construction Helpers
294// ============================================================================
295
296/// Build QUIC listen addresses based on port, IPv6 preference, and listen mode.
297///
298/// All returned addresses use the QUIC transport — the only transport
299/// currently supported for dialing. When additional transports are added,
300/// extend this function to produce addresses for those transports as well.
301///
302/// `ListenMode::Public` uses unspecified (all-interface) addresses;
303/// `ListenMode::Local` uses loopback addresses.
304#[inline]
305fn build_listen_addrs(port: u16, ipv6_enabled: bool, mode: ListenMode) -> Vec<MultiAddr> {
306    let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
307
308    let (v4, v6) = match mode {
309        ListenMode::Public => (
310            std::net::Ipv4Addr::UNSPECIFIED,
311            std::net::Ipv6Addr::UNSPECIFIED,
312        ),
313        ListenMode::Local => (std::net::Ipv4Addr::LOCALHOST, std::net::Ipv6Addr::LOCALHOST),
314    };
315
316    if ipv6_enabled {
317        addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
318            std::net::IpAddr::V6(v6),
319            port,
320        )));
321    }
322
323    addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
324        std::net::IpAddr::V4(v4),
325        port,
326    )));
327
328    addrs
329}
330
331impl NodeConfig {
332    /// Returns the effective user agent string.
333    ///
334    /// If a custom user agent was set, returns that. Otherwise, derives
335    /// the user agent from the node's [`NodeMode`].
336    pub fn user_agent(&self) -> String {
337        self.custom_user_agent
338            .clone()
339            .unwrap_or_else(|| user_agent_for_mode(self.mode))
340    }
341
342    /// Compute the listen addresses from the configuration fields.
343    ///
344    /// The returned addresses are derived from [`local`](Self::local),
345    /// [`port`](Self::port), and [`ipv6`](Self::ipv6).
346    pub fn listen_addrs(&self) -> Vec<MultiAddr> {
347        let mode = if self.local {
348            ListenMode::Local
349        } else {
350            ListenMode::Public
351        };
352        build_listen_addrs(self.port, self.ipv6, mode)
353    }
354
355    /// Create a new NodeConfig with default values
356    ///
357    /// # Errors
358    ///
359    /// Returns an error if default addresses cannot be parsed
360    pub fn new() -> Result<Self> {
361        Ok(Self::default())
362    }
363
364    /// Create a builder for customized NodeConfig construction
365    pub fn builder() -> NodeConfigBuilder {
366        NodeConfigBuilder::default()
367    }
368}
369
370// ============================================================================
371// NodeConfig Builder Pattern
372// ============================================================================
373
374/// Builder for constructing [`NodeConfig`] with a transport-aware fluent API.
375///
376/// Defaults are chosen for quick local development:
377/// - QUIC on a random free port (`0`)
378/// - IPv6 enabled (dual-stack)
379/// - All interfaces (not local-only)
380///
381/// # Examples
382///
383/// ```rust,ignore
384/// // Simplest — QUIC on random port, IPv6 on, all interfaces
385/// let config = NodeConfig::builder().build()?;
386///
387/// // Local dev/test mode (loopback, auto-enables allow_loopback)
388/// let config = NodeConfig::builder()
389///     .local(true)
390///     .build()?;
391/// ```
392#[derive(Debug, Clone)]
393pub struct NodeConfigBuilder {
394    port: u16,
395    ipv6: bool,
396    local: bool,
397    bootstrap_peers: Vec<crate::MultiAddr>,
398    max_connections: Option<usize>,
399    connection_timeout: Option<Duration>,
400    dht_config: Option<DHTConfig>,
401    max_message_size: Option<usize>,
402    mode: NodeMode,
403    custom_user_agent: Option<String>,
404    allow_loopback: Option<bool>,
405    adaptive_dht_config: Option<AdaptiveDhtConfig>,
406    close_group_cache_dir: Option<PathBuf>,
407}
408
409impl Default for NodeConfigBuilder {
410    fn default() -> Self {
411        Self {
412            port: 0,
413            ipv6: true,
414            local: false,
415            bootstrap_peers: Vec::new(),
416            max_connections: None,
417            connection_timeout: None,
418            dht_config: None,
419            max_message_size: None,
420            mode: NodeMode::default(),
421            custom_user_agent: None,
422            allow_loopback: None,
423            adaptive_dht_config: None,
424            close_group_cache_dir: None,
425        }
426    }
427}
428
429impl NodeConfigBuilder {
430    /// Set the listen port. Default: `0` (random free port).
431    pub fn port(mut self, port: u16) -> Self {
432        self.port = port;
433        self
434    }
435
436    /// Enable or disable IPv6 dual-stack. Default: `true`.
437    pub fn ipv6(mut self, enabled: bool) -> Self {
438        self.ipv6 = enabled;
439        self
440    }
441
442    /// Bind to loopback only (`true`) or all interfaces (`false`).
443    ///
444    /// When `true`, automatically enables `allow_loopback` unless explicitly
445    /// overridden via [`Self::allow_loopback`].
446    ///
447    /// Default: `false` (all interfaces).
448    pub fn local(mut self, local: bool) -> Self {
449        self.local = local;
450        self
451    }
452
453    /// Add a bootstrap peer.
454    pub fn bootstrap_peer(mut self, addr: crate::MultiAddr) -> Self {
455        self.bootstrap_peers.push(addr);
456        self
457    }
458
459    /// Set maximum connections.
460    pub fn max_connections(mut self, max: usize) -> Self {
461        self.max_connections = Some(max);
462        self
463    }
464
465    /// Set connection timeout.
466    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
467        self.connection_timeout = Some(timeout);
468        self
469    }
470
471    /// Set DHT configuration.
472    pub fn dht_config(mut self, config: DHTConfig) -> Self {
473        self.dht_config = Some(config);
474        self
475    }
476
477    /// Set maximum application-layer message size in bytes.
478    ///
479    /// If this method is not called, saorsa-transport's built-in default is used.
480    pub fn max_message_size(mut self, max_message_size: usize) -> Self {
481        self.max_message_size = Some(max_message_size);
482        self
483    }
484
485    /// Set the operating mode (Node or Client).
486    pub fn mode(mut self, mode: NodeMode) -> Self {
487        self.mode = mode;
488        self
489    }
490
491    /// Set a custom user agent string, overriding the mode-derived default.
492    pub fn custom_user_agent(mut self, user_agent: impl Into<String>) -> Self {
493        self.custom_user_agent = Some(user_agent.into());
494        self
495    }
496
497    /// Explicitly control whether loopback addresses are allowed in the
498    /// transport layer. When not called, `local(true)` auto-enables this;
499    /// `local(false)` defaults to `false`.
500    pub fn allow_loopback(mut self, allow: bool) -> Self {
501        self.allow_loopback = Some(allow);
502        self
503    }
504
505    /// Enable or disable trust-based peer swap-out.
506    ///
507    /// When `false`, peers are never swapped out of the routing table
508    /// based on trust scores. Trust scores are still tracked but have
509    /// no enforcement effect.
510    ///
511    /// When `true` (the default), peers whose trust score falls below the
512    /// swap threshold (0.35) become eligible for replacement when a
513    /// better candidate arrives.
514    ///
515    /// For fine-grained control over the threshold, use
516    /// [`adaptive_dht_config`](Self::adaptive_dht_config) instead.
517    pub fn trust_enforcement(mut self, enabled: bool) -> Self {
518        let threshold = if enabled {
519            AdaptiveDhtConfig::default().swap_threshold
520        } else {
521            0.0
522        };
523        self.adaptive_dht_config = Some(AdaptiveDhtConfig {
524            swap_threshold: threshold,
525        });
526        self
527    }
528
529    /// Set the full adaptive DHT configuration.
530    ///
531    /// Overrides any previous call to [`trust_enforcement`](Self::trust_enforcement).
532    pub fn adaptive_dht_config(mut self, config: AdaptiveDhtConfig) -> Self {
533        self.adaptive_dht_config = Some(config);
534        self
535    }
536
537    /// Set the directory for persisting the close group cache.
538    ///
539    /// The node writes `close_group_cache.json` inside this directory on
540    /// shutdown and after bootstrap, and loads it on startup.
541    pub fn close_group_cache_dir(mut self, path: impl Into<PathBuf>) -> Self {
542        self.close_group_cache_dir = Some(path.into());
543        self
544    }
545
546    /// Build the [`NodeConfig`].
547    ///
548    /// # Errors
549    ///
550    /// Returns an error if address construction fails.
551    pub fn build(self) -> Result<NodeConfig> {
552        // local mode auto-enables allow_loopback unless explicitly overridden
553        let allow_loopback = self.allow_loopback.unwrap_or(self.local);
554
555        Ok(NodeConfig {
556            local: self.local,
557            port: self.port,
558            ipv6: self.ipv6,
559            bootstrap_peers: self.bootstrap_peers,
560            connection_timeout: self
561                .connection_timeout
562                .unwrap_or(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS)),
563            max_connections: self.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
564            dht_config: self.dht_config.unwrap_or_default(),
565            diversity_config: None,
566            max_message_size: self.max_message_size,
567            node_identity: None,
568            mode: self.mode,
569            custom_user_agent: self.custom_user_agent,
570            allow_loopback,
571            adaptive_dht_config: self.adaptive_dht_config.unwrap_or_default(),
572            close_group_cache_dir: self.close_group_cache_dir,
573        })
574    }
575}
576
577impl Default for NodeConfig {
578    fn default() -> Self {
579        Self {
580            local: false,
581            port: DEFAULT_LISTEN_PORT,
582            ipv6: true,
583            bootstrap_peers: Vec::new(),
584            connection_timeout: Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS),
585            max_connections: DEFAULT_MAX_CONNECTIONS,
586            dht_config: DHTConfig::default(),
587            diversity_config: None,
588            max_message_size: None,
589            node_identity: None,
590            mode: NodeMode::default(),
591            custom_user_agent: None,
592            allow_loopback: false,
593            adaptive_dht_config: AdaptiveDhtConfig::default(),
594            close_group_cache_dir: None,
595        }
596    }
597}
598
599impl DHTConfig {
600    /// Default K value (bucket size) for Kademlia routing.
601    pub const DEFAULT_K_VALUE: usize = 20;
602    const DEFAULT_ALPHA_VALUE: usize = 3;
603    const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
604    /// Minimum k_value — values below this produce degenerate routing behavior.
605    const MIN_K_VALUE: usize = 4;
606
607    /// Validate parameter safety constraints (Section 4 points 1-13).
608    ///
609    /// Returns `Err` if any constraint is violated.
610    pub fn validate(&self) -> Result<()> {
611        if self.k_value < Self::MIN_K_VALUE {
612            return Err(P2PError::Validation(
613                format!(
614                    "k_value must be >= {} (got {}), values below {} produce degenerate behavior",
615                    Self::MIN_K_VALUE,
616                    self.k_value,
617                    Self::MIN_K_VALUE,
618                )
619                .into(),
620            ));
621        }
622        if self.alpha_value < 1 {
623            return Err(P2PError::Validation(
624                format!("alpha_value must be >= 1 (got {})", self.alpha_value).into(),
625            ));
626        }
627        if self.refresh_interval.is_zero() {
628            return Err(P2PError::Validation("refresh_interval must be > 0".into()));
629        }
630        Ok(())
631    }
632}
633
634impl Default for DHTConfig {
635    fn default() -> Self {
636        Self {
637            k_value: Self::DEFAULT_K_VALUE,
638            alpha_value: Self::DEFAULT_ALPHA_VALUE,
639            refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
640        }
641    }
642}
643
644/// Information about a connected peer
645#[derive(Debug, Clone)]
646pub struct PeerInfo {
647    /// Transport-level channel identifier (internal use only).
648    #[allow(dead_code)]
649    pub(crate) channel_id: String,
650
651    /// Peer's addresses
652    pub addresses: Vec<MultiAddr>,
653
654    /// Connection timestamp
655    pub connected_at: Instant,
656
657    /// Last seen timestamp
658    pub last_seen: Instant,
659
660    /// Connection status
661    pub status: ConnectionStatus,
662
663    /// Supported protocols
664    pub protocols: Vec<String>,
665
666    /// Number of heartbeats received
667    pub heartbeat_count: u64,
668}
669
670/// Connection status for a peer
671#[derive(Debug, Clone, PartialEq)]
672pub enum ConnectionStatus {
673    /// Connection is being established
674    Connecting,
675    /// Connection is established and active
676    Connected,
677    /// Connection is being closed
678    Disconnecting,
679    /// Connection is closed
680    Disconnected,
681    /// Connection failed
682    Failed(String),
683}
684
685/// Network events that can occur in the P2P system
686///
687/// Events are broadcast to all listeners and provide real-time
688/// notifications of network state changes and message arrivals.
689#[derive(Debug, Clone)]
690pub enum P2PEvent {
691    /// Message received from a peer on a specific topic
692    Message {
693        /// Topic or channel the message was sent on
694        topic: String,
695        /// For signed messages this is the authenticated app-level [`PeerId`];
696        /// `None` for unsigned messages.
697        source: Option<PeerId>,
698        /// IP transport address that delivered this message, when known.
699        ///
700        /// This is provenance metadata, not an identity signal.
701        transport_source: Option<MultiAddr>,
702        /// Sender-supplied Unix timestamp in seconds.
703        ///
704        /// For signed messages this value is covered by the ML-DSA-65 signature
705        /// alongside the payload, so handlers can use it for application-level
706        /// freshness or replay defense. Wire-level acceptance no longer gates
707        /// on this value; subscribers MUST do their own age/dedup checks when
708        /// the protocol requires them.
709        timestamp: u64,
710        /// Raw message data payload
711        data: Vec<u8>,
712    },
713    /// An authenticated peer has connected (first signed message verified on any channel).
714    /// The `user_agent` identifies the remote software (e.g. `"node/0.12.1"`, `"client/1.0"`).
715    PeerConnected(PeerId, String),
716    /// An authenticated peer has fully disconnected (all channels closed).
717    PeerDisconnected(PeerId),
718}
719
720/// Response from a peer to a request sent via [`P2PNode::send_request`].
721///
722/// Contains the response payload along with metadata about the responder
723/// and round-trip latency.
724#[derive(Debug, Clone)]
725pub struct PeerResponse {
726    /// The peer that sent the response.
727    pub peer_id: PeerId,
728    /// Raw response payload bytes.
729    pub data: Vec<u8>,
730    /// Round-trip latency from request to response.
731    pub latency: Duration,
732}
733
734/// Wire format for request/response correlation.
735///
736/// Wraps application payloads with a message ID and direction flag
737/// so the receive loop can route responses back to waiting callers.
738#[derive(Debug, Clone, Serialize, Deserialize)]
739pub(crate) struct RequestResponseEnvelope {
740    /// Unique identifier to correlate request ↔ response.
741    pub(crate) message_id: String,
742    /// `false` for requests, `true` for responses.
743    pub(crate) is_response: bool,
744    /// Application payload.
745    pub(crate) payload: Vec<u8>,
746}
747
748/// An in-flight request awaiting a response from a specific peer.
749pub(crate) struct PendingRequest {
750    /// Oneshot sender for delivering the response payload.
751    pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
752    /// The peer we expect the response from (for origin validation).
753    pub(crate) expected_peer: PeerId,
754}
755
756/// Short grace period after closing stale QUIC connections before re-dialing.
757///
758/// `disconnect_channel` is async and waits for the QUIC close, but the
759/// transport endpoint may need a moment to fully release internal state.
760/// Only applied when stale channels were actually disconnected.
761const QUIC_TEARDOWN_GRACE: Duration = Duration::from_millis(100);
762
763/// Main P2P network node that manages connections, routing, and communication
764///
765/// This struct represents a complete P2P network participant that can:
766/// - Connect to other peers via QUIC transport
767/// - Participate in distributed hash table (DHT) operations
768/// - Send and receive messages through various protocols
769/// - Handle network events and peer lifecycle
770///
771/// Transport concerns (connections, messaging, events) are delegated to
772/// `TransportHandle`.
773pub struct P2PNode {
774    /// Node configuration
775    config: NodeConfig,
776
777    /// Our peer ID
778    peer_id: PeerId,
779
780    /// Transport handle owning all QUIC / peer / event state
781    transport: Arc<crate::transport_handle::TransportHandle>,
782
783    /// Node start time
784    start_time: Instant,
785
786    /// Shutdown token — cancelled when the node should stop
787    shutdown: CancellationToken,
788
789    /// Adaptive DHT layer — owns both the DHT manager and the trust engine.
790    /// All DHT operations and trust signals go through this component.
791    adaptive_dht: AdaptiveDHT,
792
793    /// Bootstrap state tracking - indicates whether peer discovery has completed
794    is_bootstrapped: Arc<AtomicBool>,
795
796    /// Whether `start()` has been called (and `stop()` has not yet completed)
797    is_started: Arc<AtomicBool>,
798
799    /// Per-peer locks that serialise reconnect attempts so concurrent sends
800    /// to the same stale peer don't race to dial.  Entries accumulate over
801    /// the node's lifetime; each is a lightweight `Arc<TokioMutex<()>>`.
802    reconnect_locks: ParkingMutex<HashMap<PeerId, Arc<TokioMutex<()>>>>,
803
804    /// The peer ID of the node currently relaying traffic for us (ADR-014).
805    ///
806    /// Set after the reachability classifier acquires a relay in `start()`.
807    /// The relayer monitor watches this against the K-closest set: if the
808    /// relayer drops out, it triggers rebinding.
809    ///
810    /// `None` when the node is publicly reachable (no relay needed) or
811    /// before classification has run.
812    relayer_peer_id: Arc<RwLock<Option<PeerId>>>,
813
814    /// The relay-allocated public address (ADR-014).
815    ///
816    /// Set after a proactive MASQUE relay is acquired in `start()`. This is
817    /// the address that external peers must dial to reach this node through
818    /// the relay. `None` when the node is publicly reachable (no relay) or
819    /// before classification has run.
820    relay_address: Arc<RwLock<Option<SocketAddr>>>,
821}
822
823/// Normalize wildcard bind addresses to localhost loopback addresses
824///
825/// saorsa-transport correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
826/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
827///
828/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
829/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
830/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
831/// - All other addresses pass through unchanged
832///
833/// # Arguments
834/// * `addr` - The SocketAddr to normalize
835///
836/// # Returns
837/// * Normalized SocketAddr suitable for remote connections
838pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
839    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
840
841    if addr.ip().is_unspecified() {
842        // Convert unspecified addresses to loopback
843        let loopback_ip = match addr {
844            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
845            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
846        };
847        std::net::SocketAddr::new(loopback_ip, addr.port())
848    } else {
849        // Not a wildcard address, pass through unchanged
850        addr
851    }
852}
853
854impl P2PNode {
855    /// Create a new P2P node with the given configuration
856    pub async fn new(config: NodeConfig) -> Result<Self> {
857        // Ensure a cryptographic identity exists — generate one if not provided.
858        let node_identity = match config.node_identity.clone() {
859            Some(identity) => identity,
860            None => Arc::new(NodeIdentity::generate()?),
861        };
862
863        // Derive the canonical peer ID from the cryptographic identity.
864        let peer_id = *node_identity.peer_id();
865
866        // Validate parameter safety constraints (Section 4 points 1-13).
867        // Reject invalid config early, before any resources are allocated.
868        config.dht_config.validate()?;
869        if let Some(ref diversity) = config.diversity_config {
870            diversity
871                .validate()
872                .map_err(|e| P2PError::Validation(format!("IP diversity config: {e}").into()))?;
873        }
874
875        // Build transport handle with all transport-level concerns
876        let transport_config = crate::transport_handle::TransportConfig::from_node_config(
877            &config,
878            crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
879            node_identity.clone(),
880        );
881        let transport =
882            Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
883
884        // Initialize AdaptiveDHT — creates the trust engine and DHT manager
885        let dht_manager_config = DhtNetworkConfig {
886            peer_id,
887            node_config: config.clone(),
888            request_timeout: config.connection_timeout,
889            max_concurrent_operations: MAX_ACTIVE_REQUESTS,
890            enable_security: true,
891            swap_threshold: 0.0, // Set by AdaptiveDHT::new() from AdaptiveDhtConfig
892        };
893        let adaptive_dht = AdaptiveDHT::new(
894            transport.clone(),
895            dht_manager_config,
896            config.adaptive_dht_config.clone(),
897        )
898        .await?;
899
900        let node = Self {
901            config,
902            peer_id,
903            transport,
904            start_time: Instant::now(),
905            shutdown: CancellationToken::new(),
906            adaptive_dht,
907            is_bootstrapped: Arc::new(AtomicBool::new(false)),
908            is_started: Arc::new(AtomicBool::new(false)),
909            reconnect_locks: ParkingMutex::new(HashMap::new()),
910            relayer_peer_id: Arc::new(RwLock::new(None)),
911            relay_address: Arc::new(RwLock::new(None)),
912        };
913        info!(
914            "Created P2P node with peer ID: {} (call start() to begin networking)",
915            node.peer_id
916        );
917
918        Ok(node)
919    }
920
921    /// Get the peer ID of this node.
922    pub fn peer_id(&self) -> &PeerId {
923        &self.peer_id
924    }
925
926    /// Get the transport handle for sharing with other components.
927    pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
928        &self.transport
929    }
930
931    /// The relay-allocated public address, if this node acquired a MASQUE relay.
932    ///
933    /// Returns `Some(addr)` when the node is behind NAT and successfully
934    /// acquired a proactive relay during `start()`. External peers must dial
935    /// this address to reach the node through the relay. Returns `None` when
936    /// the node is publicly reachable or no relay was established.
937    pub async fn relay_address(&self) -> Option<SocketAddr> {
938        *self.relay_address.read().await
939    }
940
941    pub fn local_addr(&self) -> Option<MultiAddr> {
942        self.transport.local_addr()
943    }
944
945    /// Check if the node has completed the initial bootstrap process
946    ///
947    /// Returns `true` if the node has successfully connected to at least one
948    /// bootstrap peer and performed peer discovery (FIND_NODE).
949    pub fn is_bootstrapped(&self) -> bool {
950        self.is_bootstrapped.load(Ordering::SeqCst)
951    }
952
953    /// Manually trigger re-bootstrap (useful for recovery or network rejoin)
954    ///
955    /// This clears the bootstrapped state and attempts to reconnect to
956    /// bootstrap peers and discover new peers.
957    pub async fn re_bootstrap(&self) -> Result<()> {
958        self.is_bootstrapped.store(false, Ordering::SeqCst);
959        self.connect_bootstrap_peers(None).await
960    }
961
962    // =========================================================================
963    // Trust API — delegates to AdaptiveDHT
964    // =========================================================================
965
966    /// Get the trust engine for advanced use cases
967    pub fn trust_engine(&self) -> Arc<TrustEngine> {
968        self.adaptive_dht.trust_engine().clone()
969    }
970
971    /// Report a trust event for a peer.
972    ///
973    /// Core only records penalties (connection failures). Positive trust
974    /// signals are the consumer's responsibility via [`TrustEvent::ApplicationSuccess`].
975    ///
976    /// # Example
977    ///
978    /// ```rust,ignore
979    /// use saorsa_core::adaptive::TrustEvent;
980    ///
981    /// node.report_trust_event(&peer_id, TrustEvent::ApplicationSuccess(1.0)).await;
982    /// node.report_trust_event(&peer_id, TrustEvent::ConnectionFailed).await;
983    /// ```
984    pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
985        self.adaptive_dht.report_trust_event(peer_id, event).await;
986    }
987
988    /// Get the current trust score for a peer (0.0 to 1.0).
989    ///
990    /// Returns 0.5 (neutral) for unknown peers.
991    pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
992        self.adaptive_dht.peer_trust(peer_id)
993    }
994
995    /// Get the AdaptiveDHT component for direct access
996    pub fn adaptive_dht(&self) -> &AdaptiveDHT {
997        &self.adaptive_dht
998    }
999
1000    // =========================================================================
1001    // Request/Response API — Automatic Trust Feedback
1002    // =========================================================================
1003
1004    /// Send a request to a peer and wait for a response with automatic trust penalty reporting.
1005    ///
1006    /// Unlike fire-and-forget `send_message()`, this method:
1007    /// 1. Wraps the payload in a `RequestResponseEnvelope` with a unique message ID
1008    /// 2. Sends it on the `/rr/<protocol>` protocol prefix
1009    /// 3. Waits for a matching response (or timeout)
1010    /// 4. Automatically reports failure to the trust engine (success is the expected baseline)
1011    ///
1012    /// The remote peer's handler should call `send_response()` with the
1013    /// incoming message ID to route the response back.
1014    ///
1015    /// # Arguments
1016    ///
1017    /// * `peer_id` - Target peer
1018    /// * `protocol` - Application protocol name (e.g. `"peer_info"`)
1019    /// * `data` - Request payload bytes
1020    /// * `timeout` - Maximum time to wait for a response
1021    ///
1022    /// # Returns
1023    ///
1024    /// A `PeerResponse` on success, or an error on timeout / connection failure.
1025    ///
1026    /// # Example
1027    ///
1028    /// ```rust,ignore
1029    /// let response = node.send_request(&peer_id, "peer_info", request_data, Duration::from_secs(10)).await?;
1030    /// println!("Got {} bytes from {}", response.data.len(), response.peer_id);
1031    /// ```
1032    pub async fn send_request(
1033        &self,
1034        peer_id: &PeerId,
1035        protocol: &str,
1036        data: Vec<u8>,
1037        timeout: Duration,
1038    ) -> Result<PeerResponse> {
1039        let result = self
1040            .send_request_reconnecting(peer_id, protocol, data, timeout)
1041            .await;
1042        if let Err(ref e) = result {
1043            let event = if matches!(e, P2PError::Timeout(_)) {
1044                TrustEvent::ConnectionTimeout
1045            } else {
1046                TrustEvent::ConnectionFailed
1047            };
1048            self.report_trust_event(peer_id, event).await;
1049        }
1050        result
1051    }
1052
1053    /// Request/response send with reconnect-on-demand.
1054    ///
1055    /// Mirrors [`Self::send_message`]: when there is no live channel to
1056    /// `peer_id` it dials one (serialised per peer via
1057    /// [`Self::reconnect_lock_for`]) before sending, and when an existing
1058    /// channel turns out to be stale it tears it down, reconnects, and retries
1059    /// the request exactly once. The plain transport `send_request` only sends
1060    /// over a pre-existing channel and fails fast with `PeerNotFound`
1061    /// otherwise; routing request/response through this reconnecting path means
1062    /// a request to a peer whose QUIC connection has dropped (e.g. a periodic
1063    /// audit of a close peer that idled out) re-establishes the connection
1064    /// instead of surfacing as a spurious timeout.
1065    ///
1066    /// `timeout` bounds only the response wait inside the transport; the dial
1067    /// is independently bounded by `connect_peer_typed` plus
1068    /// [`IDENTITY_EXCHANGE_TIMEOUT`].
1069    async fn send_request_reconnecting(
1070        &self,
1071        peer_id: &PeerId,
1072        protocol: &str,
1073        data: Vec<u8>,
1074        timeout: Duration,
1075    ) -> Result<PeerResponse> {
1076        // Snapshot channel IDs before the send attempt — transport.send_request
1077        // prunes dead channels from bookkeeping but does NOT close the
1078        // underlying QUIC connection. We need the original IDs for
1079        // disconnect_channel later.
1080        let existing_channels = self.transport.channels_for_peer(peer_id).await;
1081
1082        // No live channel — serialise dials so concurrent requests to the same
1083        // unconnected peer don't each open their own QUIC connection.
1084        if existing_channels.is_empty() {
1085            // Hold the per-peer reconnect lock only across the dial so
1086            // concurrent requests to the same cold peer collapse onto one dial —
1087            // not across the response wait, which would serialise every such
1088            // request for the full `timeout`.
1089            {
1090                let lock = self.reconnect_lock_for(peer_id);
1091                let _guard = lock.lock().await;
1092                // Another caller may have connected while we waited for the lock.
1093                if !self.transport.is_peer_connected(peer_id).await {
1094                    self.ensure_channel(peer_id, &[], &[], &[]).await?;
1095                }
1096            }
1097            return self
1098                .transport
1099                .send_request(peer_id, protocol, data, timeout)
1100                .await;
1101        }
1102
1103        // Snapshot addresses before the attempt — transport.send_request prunes
1104        // stale channels, which removes peer_info.
1105        let saved_addrs: Vec<MultiAddr> = self
1106            .transport
1107            .peer_info(peer_id)
1108            .await
1109            .map(|info| info.addresses)
1110            .unwrap_or_default();
1111
1112        // Clone the payload for a possible retry — transport.send_request
1113        // consumes the Vec, and only stale-channel failures are retried.
1114        let retry_data = data.clone();
1115
1116        // Fast path: try the existing connection.
1117        match self
1118            .transport
1119            .send_request(peer_id, protocol, data, timeout)
1120            .await
1121        {
1122            Ok(resp) => return Ok(resp),
1123            Err(e) => {
1124                // A response-deadline timeout means the request WAS delivered
1125                // but went unanswered — reconnecting would not help, so do not
1126                // retry. Only a stale-channel send failure warrants a redial.
1127                if !e.is_stale_channel_send_failure() {
1128                    return Err(e);
1129                }
1130                debug!(
1131                    peer = %peer_id.to_hex(),
1132                    error = %e,
1133                    "stale channel request failed, attempting reconnect",
1134                );
1135            }
1136        }
1137
1138        // Serialise the reconnect (stale-channel teardown + dial) so concurrent
1139        // requests to the same stale peer don't race to dial, but release the
1140        // lock before the response wait so they don't serialise for the full
1141        // `timeout`.
1142        {
1143            let lock = self.reconnect_lock_for(peer_id);
1144            let _guard = lock.lock().await;
1145
1146            // Another caller may have reconnected while we waited for the lock.
1147            if self.transport.is_peer_connected(peer_id).await {
1148                // Close stale QUIC connections that transport.send_request's
1149                // bookkeeping cleanup didn't tear down (it only drops the mapping).
1150                for channel_id in &existing_channels {
1151                    self.transport.disconnect_channel(channel_id).await;
1152                }
1153            } else {
1154                self.ensure_channel(peer_id, &[], &saved_addrs, &existing_channels)
1155                    .await?;
1156            }
1157        }
1158        self.transport
1159            .send_request(peer_id, protocol, retry_data, timeout)
1160            .await
1161    }
1162
1163    pub async fn send_response(
1164        &self,
1165        peer_id: &PeerId,
1166        protocol: &str,
1167        message_id: &str,
1168        data: Vec<u8>,
1169    ) -> Result<()> {
1170        self.transport
1171            .send_response(peer_id, protocol, message_id, data)
1172            .await
1173    }
1174
1175    pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
1176        crate::transport_handle::TransportHandle::parse_request_envelope(data)
1177    }
1178
1179    pub async fn subscribe(&self, topic: &str) -> Result<()> {
1180        self.transport.subscribe(topic).await
1181    }
1182
1183    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1184        self.transport.publish(topic, data).await
1185    }
1186
1187    /// Get the node configuration
1188    pub fn config(&self) -> &NodeConfig {
1189        &self.config
1190    }
1191
1192    /// Start the P2P node
1193    pub async fn start(&self) -> Result<()> {
1194        info!("Starting P2P node...");
1195
1196        // Start transport listeners and message receiving
1197        self.transport.start_network_listeners().await?;
1198
1199        // Start the adaptive DHT layer (DHT manager + trust engine)
1200        self.adaptive_dht.start().await?;
1201
1202        // Log current listen addresses
1203        let listen_addrs = self.transport.listen_addrs().await;
1204        info!("P2P node started on addresses: {:?}", listen_addrs);
1205
1206        // NOTE: Message receiving is now integrated into the accept loop in start_network_listeners()
1207        // The old start_message_receiving_system() is no longer needed as it competed with the accept
1208        // loop for incoming connections, causing messages to be lost.
1209
1210        // Load close group cache and import trust scores before connecting to peers.
1211        // This ensures trust scores are available when peers are added to the routing table.
1212        let close_group_cache = if let Some(ref dir) = self.config.close_group_cache_dir {
1213            match CloseGroupCache::load_from_dir(dir).await {
1214                Ok(Some(cache)) => {
1215                    // Filter out peers with non-finite trust scores (NaN/Inf)
1216                    // that could corrupt trust engine state or sort ordering.
1217                    let original_count = cache.peers.len();
1218                    let cache = CloseGroupCache {
1219                        peers: cache
1220                            .peers
1221                            .into_iter()
1222                            .filter(|p| p.trust.score.is_finite())
1223                            .collect(),
1224                        ..cache
1225                    };
1226                    let filtered_count = original_count - cache.peers.len();
1227                    if filtered_count > 0 {
1228                        warn!(
1229                            "Filtered {filtered_count} peers with non-finite trust scores from close group cache"
1230                        );
1231                    }
1232
1233                    let trust_snapshot = TrustSnapshot {
1234                        peers: cache
1235                            .peers
1236                            .iter()
1237                            .map(|p| (p.peer_id, p.trust.clone()))
1238                            .collect(),
1239                    };
1240                    self.adaptive_dht
1241                        .trust_engine()
1242                        .import_snapshot(&trust_snapshot);
1243                    info!(
1244                        "Loaded {} peers from close group cache (trust scores imported)",
1245                        cache.peers.len()
1246                    );
1247                    Some(cache)
1248                }
1249                Ok(None) => {
1250                    debug!(
1251                        "No close group cache found in {}, fresh start",
1252                        dir.display()
1253                    );
1254                    None
1255                }
1256                Err(e) => {
1257                    warn!(
1258                        "Failed to load close group cache from {}: {e}",
1259                        dir.display()
1260                    );
1261                    None
1262                }
1263            }
1264        } else {
1265            None
1266        };
1267
1268        // Connect to bootstrap peers
1269        self.connect_bootstrap_peers(close_group_cache.as_ref())
1270            .await?;
1271
1272        // Emit BootstrapComplete — the node is connected to the network and
1273        // the DHT routing table is populated; consumers waiting on this
1274        // event can start issuing queries. The relay-acquisition driver
1275        // runs asynchronously after this point, so the node's published
1276        // self-record may be direct-only for a brief window until the
1277        // driver's first acquisition attempt finishes.
1278        {
1279            let dht = self.adaptive_dht.dht_manager();
1280            let rt_size = dht.get_routing_table_size().await;
1281            dht.emit_event(DhtNetworkEvent::BootstrapComplete { num_peers: rt_size });
1282        }
1283
1284        // Spawn the relay-acquisition driver for Node mode.
1285        //
1286        // The driver unconditionally tries to acquire a MASQUE relay from
1287        // an XOR-closest peer right after bootstrap — there is no public/
1288        // private classification. Private candidates are filtered out
1289        // ambiently: their Direct addresses are unreachable from outside
1290        // their NAT, so the QUIC dial fails and the walker advances to
1291        // the next-closest peer.
1292        //
1293        // The driver also owns the relay-lost → republish → reacquire
1294        // state machine (see `reachability::driver` for the full flow).
1295        // Clients (`NodeMode::Client`) do not run the driver at all: they
1296        // are outbound-only and do not need to be reachable.
1297        if self.config.mode != NodeMode::Client {
1298            spawn_acquisition_driver(
1299                self.adaptive_dht.dht_manager().clone(),
1300                Arc::clone(&self.transport),
1301                Arc::clone(&self.relayer_peer_id),
1302                Arc::clone(&self.relay_address),
1303                self.shutdown.clone(),
1304            );
1305        } else {
1306            info!("client mode — skipping relay acquisition driver");
1307        }
1308
1309        // Spawn background task to forward peer address updates to the DHT.
1310        //
1311        // Two event streams are bridged from the transport layer onto DHT
1312        // routing-table mutations:
1313        //
1314        //  - **Relay established**: when THIS node sets up a MASQUE relay,
1315        //    perform a DHT self-lookup so the transport's re-advertisement
1316        //    loop can ADD_ADDRESS the new relay address to the K closest
1317        //    peers — propagating it beyond peers we already happen to be
1318        //    connected to.
1319        //  - **Peer address update**: when a connected peer advertises a new
1320        //    reachable address via ADD_ADDRESS (typically its relay), update
1321        //    the DHT routing table so future lookups return that address.
1322        //
1323        // Both are handled in a `tokio::select!` against the receiver
1324        // futures so updates propagate immediately. The previous
1325        // implementation polled both queues on a 1-second interval, which
1326        // opened a race window in which a freshly-established relay was
1327        // invisible to outbound DHT queries until the next tick — causing
1328        // the first peers to dial direct (and fail) before learning about
1329        // the relay.
1330        //
1331        // **Slow work isolation**: the relay-propagation path runs an
1332        // iterative DHT lookup (`find_closest_nodes_network`) which can
1333        // take many seconds. Doing it inline in the select loop would
1334        // starve the peer-address-update branch and back up the bounded
1335        // forwarder mpsc into drop territory. Instead, the lookup +
1336        // publish is detached into its own task per relay event, so the
1337        // select loop keeps polling both branches.
1338        // DHT_BRIDGE: forward peer-advertised address updates from the
1339        // transport layer onto DHT routing table mutations. When a connected
1340        // peer's ADD_ADDRESS notification carries a different IP than the
1341        // connection's source (i.e., the peer is behind a relay or has
1342        // migrated), merge the advertised address into the peer's DHT entry.
1343        //
1344        // This node's OWN relay state changes are NOT handled here — the
1345        // relay acquisition driver (see `reachability::driver`) owns them
1346        // directly, so the "relay established" branch no longer belongs to
1347        // the bridge. The driver knows the full typed address set for the
1348        // self-record; the bridge did not.
1349        {
1350            let transport = Arc::clone(&self.transport);
1351            let dht = self.adaptive_dht.dht_manager().clone();
1352            let shutdown = self.shutdown.clone();
1353            tokio::spawn(async move {
1354                loop {
1355                    tokio::select! {
1356                        biased;
1357                        _ = shutdown.cancelled() => break,
1358                        update = transport.recv_peer_address_update() => {
1359                            let Some((peer_addr, advertised_addr)) = update else { break };
1360                            let normalized_peer =
1361                                saorsa_transport::shared::normalize_socket_addr(peer_addr);
1362                            let normalized_adv =
1363                                saorsa_transport::shared::normalize_socket_addr(advertised_addr);
1364                            // Only update DHT when the advertised IP differs
1365                            // from the peer's connection IP. Same-IP updates
1366                            // are just different NATted ports (useless for
1367                            // symmetric NAT); different-IP means a relay.
1368                            if normalized_peer.ip() == normalized_adv.ip() {
1369                                debug!(
1370                                    "DHT_BRIDGE: dropping same-IP update peer={} addr={}",
1371                                    normalized_peer,
1372                                    normalized_adv
1373                                );
1374                                continue;
1375                            }
1376                            info!(
1377                                "DHT_BRIDGE: processing relay update peer={} addr={}",
1378                                normalized_peer,
1379                                normalized_adv
1380                            );
1381                            // Look up peer ID by address (tries both IPv4 and
1382                            // IPv4-mapped IPv6 forms via dual_stack_alternate).
1383                            // For symmetric NAT, this may fail because the
1384                            // connection's channel key uses a different NATted port.
1385                            if let Some(peer_id) = transport.peer_id_for_addr(&normalized_peer).await {
1386                                let multi_addr = MultiAddr::quic(normalized_adv);
1387                                info!(
1388                                    "Updating DHT: peer {} relay address {} (connection was {})",
1389                                    peer_id, advertised_addr, peer_addr
1390                                );
1391                                if !dht
1392                                    .touch_legacy_relay_hint_if_unsequenced(&peer_id, &multi_addr)
1393                                    .await
1394                                {
1395                                    debug!(
1396                                        "DHT_BRIDGE: ignored legacy relay hint for sequenced peer {} addr {}",
1397                                        peer_id, advertised_addr
1398                                    );
1399                                }
1400                            }
1401                        }
1402                    }
1403                }
1404            });
1405        }
1406
1407        self.is_started
1408            .store(true, std::sync::atomic::Ordering::Release);
1409
1410        Ok(())
1411    }
1412
1413    // start_network_listeners and start_message_receiving_system
1414    // are now implemented in TransportHandle
1415
1416    /// Run the P2P node (blocks until shutdown)
1417    pub async fn run(&self) -> Result<()> {
1418        if !self.is_running() {
1419            self.start().await?;
1420        }
1421
1422        info!("P2P node running...");
1423
1424        // Block until shutdown is signalled. All background work (connection
1425        // lifecycle, DHT maintenance, EigenTrust) runs in dedicated tasks.
1426        self.shutdown.cancelled().await;
1427
1428        info!("P2P node stopped");
1429        Ok(())
1430    }
1431
1432    /// Stop the P2P node
1433    pub async fn stop(&self) -> Result<()> {
1434        info!("Stopping P2P node...");
1435
1436        // Save close group cache before tearing down the DHT and transport layers.
1437        if let Some(ref dir) = self.config.close_group_cache_dir
1438            && let Err(e) = self.save_close_group_cache(dir).await
1439        {
1440            warn!("Failed to save close group cache on shutdown: {e}");
1441        }
1442
1443        // Signal the run loop to exit
1444        self.shutdown.cancel();
1445
1446        // Stop DHT layer first so leave messages can be sent while transport is still active.
1447        self.adaptive_dht.stop().await?;
1448
1449        // Stop the transport layer (shutdown endpoints, join tasks, disconnect peers)
1450        self.transport.stop().await?;
1451
1452        self.is_started
1453            .store(false, std::sync::atomic::Ordering::Release);
1454
1455        info!("P2P node stopped");
1456        Ok(())
1457    }
1458
1459    /// Graceful shutdown alias for tests
1460    pub async fn shutdown(&self) -> Result<()> {
1461        self.stop().await
1462    }
1463
1464    /// Check if the node is running
1465    pub fn is_running(&self) -> bool {
1466        self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
1467    }
1468
1469    /// Get the current listen addresses
1470    pub async fn listen_addrs(&self) -> Vec<MultiAddr> {
1471        self.transport.listen_addrs().await
1472    }
1473
1474    /// Get connected peers
1475    pub async fn connected_peers(&self) -> Vec<PeerId> {
1476        self.transport.connected_peers().await
1477    }
1478
1479    /// Get peer count
1480    pub async fn peer_count(&self) -> usize {
1481        self.transport.peer_count().await
1482    }
1483
1484    /// Get peer info
1485    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1486        self.transport.peer_info(peer_id).await
1487    }
1488
1489    /// Get the channel ID for a given address, if connected (internal only).
1490    #[allow(dead_code)]
1491    pub(crate) async fn get_channel_id_by_address(&self, addr: &MultiAddr) -> Option<String> {
1492        self.transport.get_channel_id_by_address(addr).await
1493    }
1494
1495    /// List all active transport-level connections (internal only).
1496    #[allow(dead_code)]
1497    pub(crate) async fn list_active_connections(&self) -> Vec<(String, Vec<MultiAddr>)> {
1498        self.transport.list_active_connections().await
1499    }
1500
1501    /// Remove a channel from the peers map (internal only).
1502    #[allow(dead_code)]
1503    pub(crate) async fn remove_channel(&self, channel_id: &str) -> bool {
1504        self.transport.remove_channel(channel_id).await
1505    }
1506
1507    /// Close a channel's QUIC connection and remove it from all tracking maps.
1508    ///
1509    /// Use when a transport-level connection was established but identity
1510    /// exchange failed, so no [`PeerId`] is available for [`disconnect_peer`].
1511    pub(crate) async fn disconnect_channel(&self, channel_id: &str) {
1512        self.transport.disconnect_channel(channel_id).await;
1513    }
1514
1515    /// Check if an authenticated peer is connected (has at least one active channel).
1516    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1517        self.transport.is_peer_connected(peer_id).await
1518    }
1519
1520    /// Connect to a peer, returning the transport-level channel ID.
1521    ///
1522    /// The returned channel ID is **not** the app-level [`PeerId`]. To obtain
1523    /// the authenticated peer identity, call
1524    /// [`wait_for_peer_identity`](Self::wait_for_peer_identity) with the
1525    /// returned channel ID.
1526    ///
1527    /// Callers that already know how the address was classified should
1528    /// prefer [`Self::connect_peer_typed`] so the resulting log line
1529    /// carries an accurate `kind` field instead of `unknown`.
1530    pub async fn connect_peer(&self, address: &MultiAddr) -> Result<String> {
1531        self.transport.connect_peer(address).await
1532    }
1533
1534    /// Connect to a peer at the given typed address.
1535    ///
1536    /// Same as [`Self::connect_peer`] but threads the [`AddressType`]
1537    /// through to the transport-level dial log so an operator can tell,
1538    /// after the fact, whether a failed dial was against a `Direct`,
1539    /// `Relay`, `Unverified`, or `Lan` address.
1540    pub async fn connect_peer_typed(
1541        &self,
1542        address: &MultiAddr,
1543        kind: AddressType,
1544    ) -> Result<String> {
1545        self.transport.connect_peer_typed(address, kind).await
1546    }
1547
1548    /// Wait for the identity exchange on `channel_id` to complete, returning
1549    /// the authenticated [`PeerId`].
1550    ///
1551    /// Use this after [`connect_peer`](Self::connect_peer) to bridge the gap
1552    /// between the transport-level channel ID and the app-level peer identity
1553    /// required by [`send_message`](Self::send_message).
1554    pub async fn wait_for_peer_identity(
1555        &self,
1556        channel_id: &str,
1557        timeout: Duration,
1558    ) -> Result<PeerId> {
1559        self.transport
1560            .wait_for_peer_identity(channel_id, timeout)
1561            .await
1562    }
1563
1564    /// Disconnect from a peer
1565    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1566        self.transport.disconnect_peer(peer_id).await
1567    }
1568
1569    /// Check if a connection to a peer is active (internal only).
1570    #[allow(dead_code)]
1571    pub(crate) async fn is_connection_active(&self, channel_id: &str) -> bool {
1572        self.transport.is_connection_active(channel_id).await
1573    }
1574
1575    /// Send a message to an authenticated peer, reconnecting on demand.
1576    ///
1577    /// Tries the existing connection first. If the send fails (stale QUIC
1578    /// session, peer not found, etc.), resolves a dial address from:
1579    ///
1580    /// 1. Caller-provided `addrs` (highest priority)
1581    /// 2. Addresses cached in the transport layer (snapshotted before the
1582    ///    send attempt, since stale-channel cleanup removes them)
1583    /// 3. DHT routing table
1584    ///
1585    /// Then dials, waits for identity exchange, and retries the send exactly
1586    /// once on the fresh connection.  Concurrent reconnects to the same peer
1587    /// are serialised so only one dial is attempted at a time.
1588    pub async fn send_message(
1589        &self,
1590        peer_id: &PeerId,
1591        protocol: &str,
1592        data: Vec<u8>,
1593        addrs: &[MultiAddr],
1594    ) -> Result<()> {
1595        // Snapshot channel IDs before the send attempt — transport.send_message
1596        // prunes dead channels from bookkeeping but does NOT close the
1597        // underlying QUIC connection.  We need the original IDs for
1598        // disconnect_channel later.
1599        let existing_channels = self.transport.channels_for_peer(peer_id).await;
1600
1601        // No existing connection — serialise so concurrent sends to the same
1602        // unconnected peer don't each open their own QUIC connection.
1603        if existing_channels.is_empty() {
1604            let lock = self.reconnect_lock_for(peer_id);
1605            let _guard = lock.lock().await;
1606
1607            // Another sender may have connected while we waited for the lock.
1608            if self.transport.is_peer_connected(peer_id).await {
1609                return self.transport.send_message(peer_id, protocol, data).await;
1610            }
1611
1612            return self
1613                .reconnect_and_send(peer_id, protocol, data, addrs, &[], &[])
1614                .await;
1615        }
1616
1617        // Snapshot addresses before the send attempt — transport.send_message
1618        // prunes stale channels, which removes peer_info.
1619        let saved_addrs: Vec<MultiAddr> = self
1620            .transport
1621            .peer_info(peer_id)
1622            .await
1623            .map(|info| info.addresses)
1624            .unwrap_or_default();
1625
1626        // Clone data for retry — only stale-channel failures are retried, but
1627        // transport.send_message consumes the Vec.
1628        let retry_data = data.clone();
1629
1630        // Fast path: try existing connection.
1631        let send_result = self.transport.send_message(peer_id, protocol, data).await;
1632        match send_result {
1633            Ok(()) => return Ok(()),
1634            Err(e) => {
1635                if !e.is_stale_channel_send_failure() {
1636                    debug!(
1637                        peer = %peer_id.to_hex(),
1638                        error = %e,
1639                        "send failed during active channel use, not reconnecting",
1640                    );
1641                    return Err(e);
1642                }
1643
1644                debug!(
1645                    peer = %peer_id.to_hex(),
1646                    error = %e,
1647                    "stale channel send failed, attempting reconnect",
1648                );
1649            }
1650        }
1651
1652        // Serialise reconnect attempts so concurrent sends to the same
1653        // stale peer don't race to dial.
1654        let lock = self.reconnect_lock_for(peer_id);
1655        let _guard = lock.lock().await;
1656
1657        // Another sender may have reconnected while we waited for the lock.
1658        if self.transport.is_peer_connected(peer_id).await {
1659            // Close stale QUIC connections that remove_channel (called inside
1660            // transport.send_message on failure) didn't tear down — it only
1661            // removes bookkeeping, not the underlying QUIC session.
1662            for channel_id in &existing_channels {
1663                self.transport.disconnect_channel(channel_id).await;
1664            }
1665            return self
1666                .transport
1667                .send_message(peer_id, protocol, retry_data)
1668                .await;
1669        }
1670
1671        self.reconnect_and_send(
1672            peer_id,
1673            protocol,
1674            retry_data,
1675            addrs,
1676            &saved_addrs,
1677            &existing_channels,
1678        )
1679        .await
1680    }
1681
1682    /// Ensure an identity-authenticated channel to `peer_id` exists, dialing a
1683    /// fresh connection when necessary.
1684    ///
1685    /// Resolves a dial address (caller-provided > saved > DHT routing table),
1686    /// tears down any stale channels, dials, waits for the identity exchange,
1687    /// and verifies the authenticated peer matches `peer_id`. On success the
1688    /// transport's `peer_to_channel` map is populated, so a subsequent
1689    /// `send_message` / `send_request` finds the channel instead of failing
1690    /// with `PeerNotFound`. Returns `PeerNotFound` when no dialable address is
1691    /// available.
1692    ///
1693    /// Shared by [`Self::reconnect_and_send`] and
1694    /// [`Self::send_request_reconnecting`] so both gain identical dial
1695    /// behaviour.
1696    async fn ensure_channel(
1697        &self,
1698        peer_id: &PeerId,
1699        addrs: &[MultiAddr],
1700        saved_addrs: &[MultiAddr],
1701        stale_channels: &[String],
1702    ) -> Result<()> {
1703        // Resolve a dial address: caller-provided > saved > DHT.
1704        let (address, kind) = self
1705            .resolve_dial_address(peer_id, addrs, saved_addrs)
1706            .await
1707            .ok_or_else(|| {
1708                P2PError::Network(NetworkError::PeerNotFound(peer_id.to_hex().into()))
1709            })?;
1710
1711        // Tear down stale QUIC connections using their actual channel IDs.
1712        // transport.send_message only removes bookkeeping (peer_to_channel,
1713        // peers, active_connections) — it does NOT close the underlying QUIC
1714        // connection.  We must use the real channel IDs, not the resolved
1715        // dial address, because NAT / port migration can make them differ.
1716        if !stale_channels.is_empty() {
1717            for channel_id in stale_channels {
1718                self.transport.disconnect_channel(channel_id).await;
1719            }
1720            tokio::time::sleep(QUIC_TEARDOWN_GRACE).await;
1721        }
1722
1723        // Dial and wait for identity exchange.
1724        let channel_id = self.transport.connect_peer_typed(&address, kind).await?;
1725        let authenticated = match self
1726            .transport
1727            .wait_for_peer_identity(&channel_id, IDENTITY_EXCHANGE_TIMEOUT)
1728            .await
1729        {
1730            Ok(peer) => peer,
1731            Err(e) => {
1732                // Close the freshly-dialed QUIC connection so it doesn't
1733                // linger as a zombie until idle timeout.
1734                self.transport.disconnect_channel(&channel_id).await;
1735                return Err(e);
1736            }
1737        };
1738
1739        if &authenticated != peer_id {
1740            self.transport.disconnect_channel(&channel_id).await;
1741            return Err(P2PError::Identity(IdentityError::IdentityMismatch {
1742                expected: peer_id.to_hex().into(),
1743                actual: authenticated.to_hex().into(),
1744            }));
1745        }
1746
1747        Ok(())
1748    }
1749
1750    /// Tear down stale channels, reconnect to a peer, and send a message.
1751    async fn reconnect_and_send(
1752        &self,
1753        peer_id: &PeerId,
1754        protocol: &str,
1755        data: Vec<u8>,
1756        addrs: &[MultiAddr],
1757        saved_addrs: &[MultiAddr],
1758        stale_channels: &[String],
1759    ) -> Result<()> {
1760        self.ensure_channel(peer_id, addrs, saved_addrs, stale_channels)
1761            .await?;
1762        // Send on the fresh connection.
1763        self.transport.send_message(peer_id, protocol, data).await
1764    }
1765
1766    /// Resolve a dial address for `peer_id`, preferring caller-provided
1767    /// addresses over cached/DHT sources.
1768    ///
1769    /// Returns the first dialable (QUIC, non-unspecified) address found,
1770    /// paired with the [`AddressType`] the DHT routing table believes
1771    /// for that address. Caller-provided / saved addresses that don't
1772    /// appear in the routing table fall back to
1773    /// [`AddressType::Unverified`] — the same default the routing table
1774    /// applies to legacy peers that never asserted reachability.
1775    /// Returns `None` when no dialable address is available.
1776    async fn resolve_dial_address(
1777        &self,
1778        peer_id: &PeerId,
1779        caller_addrs: &[MultiAddr],
1780        saved_addrs: &[MultiAddr],
1781    ) -> Option<(MultiAddr, AddressType)> {
1782        // Caller- and saved-supplied addresses skip the routing-table read.
1783        // The kind is only consumed as a log tag by `connect_peer_typed`, so
1784        // defaulting to Unverified — the same fallback the routing table
1785        // applies to legacy peers — saves an async lookup on the hot
1786        // reconnect path. Only consult the DHT when both upstream sources
1787        // are exhausted.
1788        if let Some(addr) = Self::first_dialable(caller_addrs) {
1789            return Some((addr, AddressType::Unverified));
1790        }
1791        if let Some(addr) = Self::first_dialable(saved_addrs) {
1792            return Some((addr, AddressType::Unverified));
1793        }
1794
1795        self.adaptive_dht
1796            .peer_addresses_for_dial_typed(peer_id)
1797            .await
1798            .into_iter()
1799            .find(|(a, _)| {
1800                a.dialable_socket_addr()
1801                    .is_some_and(|sa| !sa.ip().is_unspecified())
1802            })
1803    }
1804
1805    /// Return the first dialable QUIC address from a slice, skipping
1806    /// non-QUIC and unspecified (`0.0.0.0` / `::`) addresses.
1807    fn first_dialable(addrs: &[MultiAddr]) -> Option<MultiAddr> {
1808        addrs
1809            .iter()
1810            .find(|a| {
1811                let dialable = a
1812                    .dialable_socket_addr()
1813                    .is_some_and(|sa| !sa.ip().is_unspecified());
1814                if !dialable {
1815                    trace!(address = %a, "skipping non-dialable address");
1816                }
1817                dialable
1818            })
1819            .cloned()
1820    }
1821
1822    /// Get or create a per-peer reconnect lock.
1823    fn reconnect_lock_for(&self, peer_id: &PeerId) -> Arc<TokioMutex<()>> {
1824        self.reconnect_locks
1825            .lock()
1826            .entry(*peer_id)
1827            .or_insert_with(|| Arc::new(TokioMutex::new(())))
1828            .clone()
1829    }
1830}
1831
1832/// Convenience constructor for `P2PError::Network(NetworkError::ProtocolError(...))`.
1833fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
1834    P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
1835}
1836
1837/// Helper to send an event via a broadcast sender, logging at trace level if no receivers.
1838pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
1839    if let Err(e) = tx.send(event) {
1840        tracing::trace!("Event broadcast has no receivers: {e}");
1841    }
1842}
1843
1844/// Result of parsing a protocol message, including optional authenticated identity.
1845///
1846/// The signed wire timestamp is carried on the inner [`P2PEvent::Message`]
1847/// (see its `timestamp` field) so subscribers can apply their own freshness
1848/// or replay policy now that the wire-level skew gate is gone.
1849pub(crate) struct ParsedMessage {
1850    /// The P2P event to broadcast.
1851    pub(crate) event: P2PEvent,
1852    /// If the message was signed and verified, the authenticated app-level [`PeerId`].
1853    pub(crate) authenticated_node_id: Option<PeerId>,
1854    /// The sender's user agent string from the wire message.
1855    pub(crate) user_agent: String,
1856}
1857
1858/// Parse a postcard-encoded protocol message into a `P2PEvent::Message`.
1859///
1860/// Returns `None` if the bytes cannot be deserialized as a valid `WireMessage`.
1861///
1862/// The `from` field is a required part of the wire protocol but is **not**
1863/// used as the event source. Instead, `source` — the transport-level peer ID
1864/// derived from the authenticated QUIC connection — is used so that consumers
1865/// can pass it directly to `send_message()`. This eliminates a spoofing
1866/// vector where a peer could claim an arbitrary identity via the payload.
1867pub(crate) fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<ParsedMessage> {
1868    let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1869    let transport_source = source.parse::<SocketAddr>().ok().map(MultiAddr::quic);
1870
1871    // Verify app-level signature if present
1872    let authenticated_node_id = if !message.signature.is_empty() {
1873        match verify_message_signature(&message) {
1874            Ok(peer_id) => {
1875                debug!(
1876                    "Message from {} authenticated as app-level NodeId {}",
1877                    source, peer_id
1878                );
1879                Some(peer_id)
1880            }
1881            Err(e) => {
1882                warn!(
1883                    "Rejecting message from {}: signature verification failed: {}",
1884                    source, e
1885                );
1886                return None;
1887            }
1888        }
1889    } else {
1890        None
1891    };
1892
1893    debug!(
1894        "Parsed P2PEvent::Message - topic: {}, source: {:?} (transport: {}, logical: {}), payload_len: {}",
1895        message.protocol,
1896        authenticated_node_id,
1897        source,
1898        message.from,
1899        message.data.len()
1900    );
1901
1902    Some(ParsedMessage {
1903        event: P2PEvent::Message {
1904            topic: message.protocol,
1905            source: authenticated_node_id,
1906            transport_source,
1907            timestamp: message.timestamp,
1908            data: message.data,
1909        },
1910        authenticated_node_id,
1911        user_agent: message.user_agent,
1912    })
1913}
1914
1915/// Verify the ML-DSA-65 signature on a WireMessage and return the authenticated [`PeerId`].
1916///
1917/// Besides verifying the cryptographic signature, this also checks that the
1918/// self-asserted `from` field matches the [`PeerId`] derived from the public
1919/// key. This prevents a sender from signing with their real key while
1920/// claiming a different identity in the `from` field.
1921fn verify_message_signature(message: &WireMessage) -> std::result::Result<PeerId, String> {
1922    let pubkey = MlDsaPublicKey::from_bytes(&message.public_key)
1923        .map_err(|e| format!("invalid public key: {e:?}"))?;
1924
1925    let peer_id = peer_id_from_public_key(&pubkey);
1926
1927    // Validate that the self-asserted `from` field matches the public key.
1928    if message.from != peer_id {
1929        return Err(format!(
1930            "from field mismatch: message claims '{}' but public key derives '{}'",
1931            message.from, peer_id
1932        ));
1933    }
1934
1935    let signable = postcard::to_stdvec(&(
1936        &message.protocol,
1937        &message.data as &[u8],
1938        &message.from,
1939        message.timestamp,
1940        &message.user_agent,
1941    ))
1942    .map_err(|e| format!("failed to serialize signable bytes: {e}"))?;
1943
1944    let sig = MlDsaSignature::from_bytes(&message.signature)
1945        .map_err(|e| format!("invalid signature: {e:?}"))?;
1946
1947    let valid = crate::quantum_crypto::ml_dsa_verify(&pubkey, &signable, &sig)
1948        .map_err(|e| format!("verification error: {e}"))?;
1949
1950    if valid {
1951        Ok(peer_id)
1952    } else {
1953        Err("signature is invalid".to_string())
1954    }
1955}
1956
1957impl P2PNode {
1958    /// Subscribe to network events
1959    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1960        self.transport.subscribe_events()
1961    }
1962
1963    /// Backwards-compat event stream accessor for tests
1964    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1965        self.subscribe_events()
1966    }
1967
1968    /// Get node uptime
1969    pub fn uptime(&self) -> Duration {
1970        self.start_time.elapsed()
1971    }
1972
1973    // MCP removed: all MCP tool/service methods removed
1974
1975    // /// Handle MCP remote tool call with network integration
1976
1977    // /// List tools available on a specific remote peer
1978
1979    // /// Get MCP server statistics
1980
1981    // Background tasks (connection_lifecycle_monitor, keepalive, periodic_maintenance)
1982    // are now implemented in TransportHandle.
1983
1984    /// Check system health
1985    pub async fn health_check(&self) -> Result<()> {
1986        let peer_count = self.peer_count().await;
1987        if peer_count > self.config.max_connections {
1988            Err(protocol_error(format!(
1989                "Too many connections: {peer_count}"
1990            )))
1991        } else {
1992            Ok(())
1993        }
1994    }
1995
1996    /// Get the attached DHT manager.
1997    pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
1998        self.adaptive_dht.dht_manager()
1999    }
2000
2001    /// Backwards-compatible alias for `dht_manager()`.
2002    pub fn dht(&self) -> &Arc<DhtNetworkManager> {
2003        self.dht_manager()
2004    }
2005
2006    /// Connect to bootstrap peers and perform initial peer discovery.
2007    ///
2008    /// If a `close_group_cache` was loaded on startup, its peers are injected
2009    /// as the highest-priority addresses before configured bootstrap peers.
2010    /// Their trust scores were already imported into the `TrustEngine` before
2011    /// this method is called.
2012    async fn connect_bootstrap_peers(
2013        &self,
2014        close_group_cache: Option<&CloseGroupCache>,
2015    ) -> Result<()> {
2016        // Each entry is a list of addresses for a single peer. Close-group
2017        // peers are dialed serially to preserve trust-priority ordering;
2018        // configured bootstrap peers are dialed concurrently to cut cold-start
2019        // latency when some peers are slow or dead.
2020        let mut serial_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
2021        let mut parallel_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
2022        let mut seen_addresses = std::collections::HashSet::new();
2023
2024        // Priority 0: Cached close group peers (pre-trusted, highest priority).
2025        // These peers had trust scores loaded into the TrustEngine earlier in start(),
2026        // so they are already known-good when added to the routing table.
2027        // Sorted by trust score (highest first), then XOR distance (closest first)
2028        // as tiebreaker so we reconnect to the most trusted, closest peers first.
2029        if let Some(cache) = close_group_cache {
2030            let mut sorted_peers: Vec<&CachedCloseGroupPeer> = cache.peers.iter().collect();
2031            sorted_peers.sort_by(|a, b| {
2032                // NaN-safe comparison: push NaN scores to the back instead
2033                // of treating them as equal (which would silently promote
2034                // corrupted entries to the front of the reconnection queue).
2035                let score_ord = match b.trust.score.partial_cmp(&a.trust.score) {
2036                    Some(ord) => ord,
2037                    None => {
2038                        if a.trust.score.is_nan() {
2039                            std::cmp::Ordering::Greater // a is NaN, push to back
2040                        } else {
2041                            std::cmp::Ordering::Less // b is NaN, push b to back
2042                        }
2043                    }
2044                };
2045                score_ord.then_with(|| {
2046                    let da = self.peer_id.xor_distance(&a.peer_id);
2047                    let db = self.peer_id.xor_distance(&b.peer_id);
2048                    da.cmp(&db)
2049                })
2050            });
2051
2052            let mut added_from_close_group = 0usize;
2053            for peer in &sorted_peers {
2054                let new_addresses: Vec<MultiAddr> = peer
2055                    .addresses
2056                    .iter()
2057                    .filter(|a| {
2058                        a.dialable_socket_addr()
2059                            .is_some_and(|sa| !seen_addresses.contains(&sa))
2060                    })
2061                    .cloned()
2062                    .collect();
2063
2064                if !new_addresses.is_empty() {
2065                    for addr in &new_addresses {
2066                        if let Some(sa) = addr.socket_addr() {
2067                            seen_addresses.insert(sa);
2068                        }
2069                    }
2070                    serial_addr_sets.push(new_addresses);
2071                    added_from_close_group += 1;
2072                }
2073            }
2074            if added_from_close_group > 0 {
2075                info!(
2076                    "Added {} close group cache peers (highest trust first)",
2077                    added_from_close_group
2078                );
2079            }
2080        }
2081
2082        // Priority 1: Configured bootstrap peers.
2083        if !self.config.bootstrap_peers.is_empty() {
2084            info!(
2085                "Using {} configured bootstrap peers (priority)",
2086                self.config.bootstrap_peers.len()
2087            );
2088            for multiaddr in &self.config.bootstrap_peers {
2089                let Some(socket_addr) = multiaddr.dialable_socket_addr() else {
2090                    warn!("Skipping non-QUIC bootstrap peer: {}", multiaddr);
2091                    continue;
2092                };
2093                seen_addresses.insert(socket_addr);
2094                parallel_addr_sets.push(vec![multiaddr.clone()]);
2095            }
2096        }
2097
2098        if serial_addr_sets.is_empty() && parallel_addr_sets.is_empty() {
2099            info!("No bootstrap peers configured");
2100            return Ok(());
2101        }
2102
2103        // Connect to bootstrap peers, wait for identity exchange, then
2104        // perform DHT peer discovery using the real cryptographic PeerIds.
2105        let identity_timeout = Duration::from_secs(BOOTSTRAP_IDENTITY_TIMEOUT_SECS);
2106        let mut successful_connections = 0;
2107        let mut connected_peer_ids: Vec<PeerId> = Vec::new();
2108
2109        // Phase A: serial close-group dials to preserve trust-priority ordering.
2110        let client_mode = matches!(self.config.mode, NodeMode::Client);
2111        for addrs in &serial_addr_sets {
2112            if let Some(peer_id) = self.dial_bootstrap_addr_set(addrs, identity_timeout).await {
2113                successful_connections += 1;
2114                connected_peer_ids.push(peer_id);
2115                if client_mode && successful_connections >= CLIENT_BOOTSTRAP_TARGET {
2116                    debug!(
2117                        "Client bootstrap target reached ({successful_connections} peers) — skipping remaining serial dials"
2118                    );
2119                    break;
2120                }
2121            }
2122        }
2123
2124        // Phase B: concurrent dials of configured bootstrap peers, bounded by
2125        // `MAX_CONCURRENT_BOOTSTRAP_DIALS` to cap simultaneous QUIC+PQC
2126        // handshakes. Skipped entirely when a client has already hit its
2127        // target during Phase A.
2128        if !client_mode || successful_connections < CLIENT_BOOTSTRAP_TARGET {
2129            let mut parallel_stream =
2130                futures::stream::iter(parallel_addr_sets.into_iter().map(|addrs| async move {
2131                    self.dial_bootstrap_addr_set(&addrs, identity_timeout).await
2132                }))
2133                .buffer_unordered(MAX_CONCURRENT_BOOTSTRAP_DIALS);
2134            while let Some(result) = parallel_stream.next().await {
2135                if let Some(peer_id) = result {
2136                    successful_connections += 1;
2137                    connected_peer_ids.push(peer_id);
2138                    if client_mode && successful_connections >= CLIENT_BOOTSTRAP_TARGET {
2139                        debug!(
2140                            "Client bootstrap target reached ({successful_connections} peers) — cancelling pending dials"
2141                        );
2142                        break;
2143                    }
2144                }
2145            }
2146            // `parallel_stream` is dropped here when the `if` block exits,
2147            // cancelling any in-flight futures inside `buffer_unordered`
2148            // before we proceed to the DHT discovery phase below.
2149        }
2150
2151        if successful_connections == 0 {
2152            // Outbound connections failed — but for nodes behind symmetric NAT,
2153            // the bootstrap peer may have already connected INBOUND to us.
2154            // Wait briefly and check if we have any transport-level connections.
2155            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2156            let transport_peers = self.transport.connected_peers().await;
2157            if !transport_peers.is_empty() {
2158                info!(
2159                    "No outbound bootstrap succeeded, but {} inbound peer(s) connected — proceeding with DHT bootstrap",
2160                    transport_peers.len()
2161                );
2162                connected_peer_ids = transport_peers;
2163                successful_connections = connected_peer_ids.len();
2164            } else {
2165                warn!("Failed to connect to any bootstrap peers");
2166                // Starting a node should not be gated on immediate bootstrap connectivity.
2167                // Keep running and allow background discovery / retries to populate peers later.
2168                return Ok(());
2169            }
2170        }
2171
2172        info!(
2173            "Successfully connected to {} bootstrap peers",
2174            successful_connections
2175        );
2176
2177        // Perform DHT peer discovery from connected bootstrap peers.
2178        match self
2179            .dht_manager()
2180            .bootstrap_from_peers(&connected_peer_ids)
2181            .await
2182        {
2183            Ok(count) => info!("DHT peer discovery found {} peers", count),
2184            Err(e) => warn!("DHT peer discovery failed: {}", e),
2185        }
2186
2187        // Perform two consecutive self-lookups to fully refresh the close
2188        // neighborhood. The second lookup may discover peers that joined or
2189        // became reachable during the first lookup (Section 11.2 step 5).
2190        //
2191        // Client-mode nodes don't serve the DHT, so they don't need an
2192        // accurate close neighborhood — they just need enough peers to route
2193        // lookups for their own requests, which `bootstrap_from_peers` above
2194        // already provides. Skipping the self-lookups here cuts cold-start
2195        // latency by tens of seconds when α-sized batches include dead peers.
2196        if matches!(self.config.mode, NodeMode::Node) {
2197            const SELF_LOOKUP_ROUNDS: u8 = 2;
2198            for i in 1..=SELF_LOOKUP_ROUNDS {
2199                if let Err(e) = self.dht_manager().trigger_self_lookup().await {
2200                    warn!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} failed: {e}");
2201                } else {
2202                    debug!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} completed");
2203                }
2204            }
2205        } else {
2206            debug!("Skipping post-bootstrap self-lookups (client mode)");
2207        }
2208
2209        // Mark node as bootstrapped - we have connected to bootstrap peers
2210        // and initiated peer discovery
2211        self.is_bootstrapped.store(true, Ordering::SeqCst);
2212        info!(
2213            "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
2214            successful_connections,
2215            connected_peer_ids.len()
2216        );
2217
2218        // Save close group cache after initial bootstrap so a crash before
2219        // graceful shutdown still preserves the newly-discovered close group.
2220        if let Some(ref dir) = self.config.close_group_cache_dir
2221            && let Err(e) = self.save_close_group_cache(dir).await
2222        {
2223            warn!("Failed to save close group cache after bootstrap: {e}");
2224        }
2225
2226        Ok(())
2227    }
2228
2229    /// Dial a single bootstrap peer's address set, stopping at the first
2230    /// address that completes the identity handshake. Returns the remote peer's
2231    /// cryptographic PeerId on success. Safe to call concurrently for different
2232    /// peers.
2233    async fn dial_bootstrap_addr_set(
2234        &self,
2235        addrs: &[MultiAddr],
2236        identity_timeout: Duration,
2237    ) -> Option<PeerId> {
2238        for addr in addrs {
2239            // Bootstrap addresses come from operator-supplied seeds (CLI
2240            // flags or config file). The local reachability classifier hasn't
2241            // proven them yet, so log them as `Unverified` rather than
2242            // `unknown`.
2243            match self
2244                .transport
2245                .connect_peer_typed(addr, AddressType::Unverified)
2246                .await
2247            {
2248                Ok(channel_id) => match self
2249                    .transport
2250                    .wait_for_peer_identity(&channel_id, identity_timeout)
2251                    .await
2252                {
2253                    Ok(real_peer_id) => return Some(real_peer_id),
2254                    Err(e) => {
2255                        warn!(
2256                            "Timeout waiting for identity from bootstrap peer {}: {}, \
2257                             closing channel {}",
2258                            addr, e, channel_id
2259                        );
2260                        self.disconnect_channel(&channel_id).await;
2261                    }
2262                },
2263                Err(e) => {
2264                    warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2265                }
2266            }
2267        }
2268        None
2269    }
2270
2271    /// Persist the current close group peers and their trust scores to disk.
2272    async fn save_close_group_cache(&self, dir: &Path) -> anyhow::Result<()> {
2273        let key: crate::dht::Key = *self.peer_id.as_bytes();
2274        let k_value = self.config.dht_config.k_value;
2275        let close_group = self
2276            .dht_manager()
2277            .find_closest_nodes_local(&key, k_value)
2278            .await;
2279
2280        if close_group.is_empty() {
2281            debug!("No close group peers to save");
2282            return Ok(());
2283        }
2284
2285        let trust_engine = self.adaptive_dht.trust_engine();
2286        let now_epoch = SystemTime::now()
2287            .duration_since(UNIX_EPOCH)
2288            .map(|d| d.as_secs())
2289            .unwrap_or(0);
2290
2291        let peers: Vec<CachedCloseGroupPeer> = close_group
2292            .into_iter()
2293            .filter_map(|dht_node| {
2294                let score = trust_engine.score(&dht_node.peer_id);
2295                // Guard against NaN/Infinity — serde_json cannot round-trip
2296                // non-finite f64 values, which would corrupt the cache file.
2297                if !score.is_finite() {
2298                    return None;
2299                }
2300                Some(CachedCloseGroupPeer {
2301                    peer_id: dht_node.peer_id,
2302                    addresses: dht_node.addresses,
2303                    trust: TrustRecord {
2304                        score,
2305                        last_updated_epoch_secs: now_epoch,
2306                    },
2307                })
2308            })
2309            .collect();
2310
2311        let peer_count = peers.len();
2312        let cache = CloseGroupCache {
2313            peers,
2314            saved_at_epoch_secs: now_epoch,
2315        };
2316
2317        cache.save_to_dir(dir).await?;
2318        info!(
2319            "Saved {} close group peers to cache in {}",
2320            peer_count,
2321            dir.display()
2322        );
2323        Ok(())
2324    }
2325
2326    // disconnect_all_peers and periodic_tasks are now in TransportHandle
2327}
2328
2329/// Network sender trait for sending messages
2330#[async_trait::async_trait]
2331#[allow(dead_code)]
2332pub trait NetworkSender: Send + Sync {
2333    /// Send a message to an authenticated peer.
2334    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2335
2336    /// Get our local peer ID (cryptographic identity).
2337    fn local_peer_id(&self) -> PeerId;
2338}
2339
2340// P2PNetworkSender removed — NetworkSender is now implemented directly on TransportHandle.
2341// NodeBuilder removed — use NodeConfigBuilder + P2PNode::new() instead.
2342
2343/// Helper function to register a new channel.
2344///
2345/// Sync because the underlying map is a sharded `DashMap` — no `.await` is
2346/// needed to take a write lock. Keeping this sync is what lets the inbound
2347/// accept loop in `TransportHandle` insert without yielding, so it cannot
2348/// stall and back-pressure the upstream handshake channel.
2349pub(crate) fn register_new_channel(
2350    peers: &DashMap<String, PeerInfo>,
2351    channel_id: &str,
2352    remote_addr: &MultiAddr,
2353) {
2354    let peer_info = PeerInfo {
2355        channel_id: channel_id.to_owned(),
2356        addresses: vec![remote_addr.clone()],
2357        connected_at: tokio::time::Instant::now(),
2358        last_seen: tokio::time::Instant::now(),
2359        status: ConnectionStatus::Connected,
2360        protocols: vec!["p2p-core/1.0.0".to_string()],
2361        heartbeat_count: 0,
2362    };
2363    peers.insert(channel_id.to_owned(), peer_info);
2364}
2365
2366#[cfg(test)]
2367mod tests {
2368    use super::*;
2369    // MCP removed from tests
2370    use std::time::Duration;
2371    use tokio::time::timeout;
2372
2373    /// 2 MiB — used in builder tests to verify max_message_size configuration.
2374    const TEST_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
2375
2376    // Test tool handler for network tests
2377
2378    // MCP removed
2379
2380    /// Helper function to create a test node configuration
2381    fn create_test_node_config() -> NodeConfig {
2382        NodeConfig {
2383            local: true,
2384            port: 0,
2385            ipv6: true,
2386            bootstrap_peers: vec![],
2387            connection_timeout: Duration::from_secs(2),
2388            max_connections: 100,
2389            dht_config: DHTConfig::default(),
2390            diversity_config: None,
2391            max_message_size: None,
2392            node_identity: None,
2393            mode: NodeMode::default(),
2394            custom_user_agent: None,
2395            allow_loopback: true,
2396            adaptive_dht_config: AdaptiveDhtConfig::default(),
2397            close_group_cache_dir: None,
2398        }
2399    }
2400
2401    /// Helper function to create a test tool
2402    // MCP removed: test tool helper deleted
2403
2404    #[tokio::test]
2405    async fn test_node_config_default() {
2406        let config = NodeConfig::default();
2407
2408        assert_eq!(config.listen_addrs().len(), 2); // IPv4 + IPv6
2409        assert_eq!(config.max_connections, 10000);
2410        assert_eq!(config.connection_timeout, Duration::from_secs(25));
2411    }
2412
2413    #[tokio::test]
2414    async fn test_dht_config_default() {
2415        let config = DHTConfig::default();
2416
2417        assert_eq!(config.k_value, 20);
2418        assert_eq!(config.alpha_value, 3);
2419        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2420    }
2421
2422    #[test]
2423    fn test_connection_status_variants() {
2424        let connecting = ConnectionStatus::Connecting;
2425        let connected = ConnectionStatus::Connected;
2426        let disconnecting = ConnectionStatus::Disconnecting;
2427        let disconnected = ConnectionStatus::Disconnected;
2428        let failed = ConnectionStatus::Failed("test error".to_string());
2429
2430        assert_eq!(connecting, ConnectionStatus::Connecting);
2431        assert_eq!(connected, ConnectionStatus::Connected);
2432        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2433        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2434        assert_ne!(connecting, connected);
2435
2436        if let ConnectionStatus::Failed(msg) = failed {
2437            assert_eq!(msg, "test error");
2438        } else {
2439            panic!("Expected Failed status");
2440        }
2441    }
2442
2443    #[tokio::test]
2444    async fn test_node_creation() -> Result<()> {
2445        let config = create_test_node_config();
2446        let node = P2PNode::new(config).await?;
2447
2448        // PeerId is derived from the cryptographic identity (32-byte BLAKE3 hash)
2449        assert_eq!(node.peer_id().to_hex().len(), 64);
2450        assert!(!node.is_running());
2451        assert_eq!(node.peer_count().await, 0);
2452        assert!(node.connected_peers().await.is_empty());
2453
2454        Ok(())
2455    }
2456
2457    #[tokio::test]
2458    async fn test_node_lifecycle() -> Result<()> {
2459        let config = create_test_node_config();
2460        let node = P2PNode::new(config).await?;
2461
2462        // Initially not running
2463        assert!(!node.is_running());
2464
2465        // Start the node
2466        node.start().await?;
2467        assert!(node.is_running());
2468
2469        // Check listen addresses were set (at least one)
2470        let listen_addrs = node.listen_addrs().await;
2471        assert!(
2472            !listen_addrs.is_empty(),
2473            "Expected at least one listening address"
2474        );
2475
2476        // Stop the node
2477        node.stop().await?;
2478        assert!(!node.is_running());
2479
2480        Ok(())
2481    }
2482
2483    #[tokio::test]
2484    async fn test_peer_connection() -> Result<()> {
2485        let config1 = create_test_node_config();
2486        let config2 = create_test_node_config();
2487
2488        let node1 = P2PNode::new(config1).await?;
2489        let node2 = P2PNode::new(config2).await?;
2490
2491        node1.start().await?;
2492        node2.start().await?;
2493
2494        let node2_addr = node2
2495            .listen_addrs()
2496            .await
2497            .into_iter()
2498            .find(|a| a.is_ipv4())
2499            .ok_or_else(|| {
2500                P2PError::Network(crate::error::NetworkError::InvalidAddress(
2501                    "Node 2 did not expose an IPv4 listen address".into(),
2502                ))
2503            })?;
2504
2505        // Connect to a real peer (unsigned — no node_identity configured).
2506        // connect_peer returns a transport-level channel ID (String), not a PeerId.
2507        let channel_id = node1.connect_peer(&node2_addr).await?;
2508
2509        // Unauthenticated connections don't appear in the app-level peer maps.
2510        // Verify transport-level tracking via is_connection_active / peers map.
2511        assert!(node1.is_connection_active(&channel_id).await);
2512
2513        // Get peer info from the transport-level peers map (keyed by channel ID)
2514        let peer_info = node1.transport.peer_info_by_channel(&channel_id).await;
2515        assert!(peer_info.is_some());
2516        let info = peer_info.expect("Peer info should exist after connect");
2517        assert_eq!(info.channel_id, channel_id);
2518        assert_eq!(info.status, ConnectionStatus::Connected);
2519        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2520
2521        // Disconnect the channel
2522        node1.remove_channel(&channel_id).await;
2523        assert!(!node1.is_connection_active(&channel_id).await);
2524
2525        node1.stop().await?;
2526        node2.stop().await?;
2527
2528        Ok(())
2529    }
2530
2531    #[tokio::test]
2532    async fn test_connect_peer_rejects_tcp_multiaddr() -> Result<()> {
2533        let config = create_test_node_config();
2534        let node = P2PNode::new(config).await?;
2535
2536        let tcp_addr: MultiAddr = "/ip4/127.0.0.1/tcp/1".parse().unwrap();
2537        let result = node.connect_peer(&tcp_addr).await;
2538
2539        assert!(
2540            matches!(
2541                result,
2542                Err(P2PError::Network(
2543                    crate::error::NetworkError::InvalidAddress(_)
2544                ))
2545            ),
2546            "TCP multiaddrs should be rejected before a QUIC dial is attempted, got: {:?}",
2547            result
2548        );
2549
2550        Ok(())
2551    }
2552
2553    // TODO(windows): Investigate QUIC connection issues on Windows CI
2554    // This test consistently fails on Windows GitHub Actions runners with
2555    // "All connect attempts failed" even with IPv4-only config, long delays,
2556    // and multiple retry attempts. The underlying saorsa-transport library may have
2557    // issues on Windows that need investigation.
2558    // See: https://github.com/WithAutonomi/saorsa-core/issues/TBD
2559    #[cfg_attr(target_os = "windows", ignore)]
2560    #[tokio::test]
2561    async fn test_event_subscription() -> Result<()> {
2562        // PeerConnected/PeerDisconnected only fire for authenticated peers
2563        // (nodes with node_identity that send signed messages).
2564        // Configure both nodes with identities so the event subscription test works.
2565        let identity1 =
2566            Arc::new(NodeIdentity::generate().expect("should generate identity for test node1"));
2567        let identity2 =
2568            Arc::new(NodeIdentity::generate().expect("should generate identity for test node2"));
2569
2570        let mut config1 = create_test_node_config();
2571        config1.ipv6 = false;
2572        config1.node_identity = Some(identity1);
2573
2574        let node2_peer_id = *identity2.peer_id();
2575        let mut config2 = create_test_node_config();
2576        config2.ipv6 = false;
2577        config2.node_identity = Some(identity2);
2578
2579        let node1 = P2PNode::new(config1).await?;
2580        let node2 = P2PNode::new(config2).await?;
2581
2582        node1.start().await?;
2583        node2.start().await?;
2584
2585        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
2586
2587        // Subscribe to node2's events (node2 will receive the signed message)
2588        let mut events = node2.subscribe_events();
2589
2590        let node2_addr = node2.local_addr().ok_or_else(|| {
2591            P2PError::Network(crate::error::NetworkError::ProtocolError(
2592                "No listening address".to_string().into(),
2593            ))
2594        })?;
2595
2596        // Connect node1 → node2
2597        let mut channel_id = None;
2598        for attempt in 0..3 {
2599            if attempt > 0 {
2600                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2601            }
2602            match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
2603                Ok(Ok(id)) => {
2604                    channel_id = Some(id);
2605                    break;
2606                }
2607                Ok(Err(_)) | Err(_) => continue,
2608            }
2609        }
2610        let channel_id = channel_id.expect("Failed to connect after 3 attempts");
2611
2612        // Wait for identity exchange to complete via wait_for_peer_identity.
2613        let target_peer_id = node1
2614            .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2615            .await?;
2616        assert_eq!(target_peer_id, node2_peer_id);
2617
2618        // node1 sends a signed message → node2 authenticates → PeerConnected fires on node2
2619        node1
2620            .send_message(&target_peer_id, "test-topic", b"hello".to_vec(), &[])
2621            .await?;
2622
2623        // Check for PeerConnected event on node2
2624        let event = timeout(Duration::from_secs(2), async {
2625            loop {
2626                match events.recv().await {
2627                    Ok(P2PEvent::PeerConnected(id, _)) => return Ok(id),
2628                    Ok(P2PEvent::Message { .. }) => continue, // skip messages
2629                    Ok(_) => continue,
2630                    Err(e) => return Err(e),
2631                }
2632            }
2633        })
2634        .await;
2635        assert!(event.is_ok(), "Should receive PeerConnected event");
2636        let connected_peer_id = event.expect("Timed out").expect("Channel error");
2637        // The connected peer ID should be node1's app-level ID (a valid PeerId)
2638        assert!(
2639            connected_peer_id.0.iter().any(|&b| b != 0),
2640            "PeerConnected should carry a non-zero peer ID"
2641        );
2642
2643        node1.stop().await?;
2644        node2.stop().await?;
2645
2646        Ok(())
2647    }
2648
2649    // TODO(windows): Same QUIC connection issues as test_event_subscription
2650    #[cfg_attr(target_os = "windows", ignore)]
2651    #[tokio::test]
2652    async fn test_message_sending() -> Result<()> {
2653        // Create two nodes (IPv4-only loopback)
2654        let mut config1 = create_test_node_config();
2655        config1.ipv6 = false;
2656        let node1 = P2PNode::new(config1).await?;
2657        node1.start().await?;
2658
2659        let mut config2 = create_test_node_config();
2660        config2.ipv6 = false;
2661        let node2 = P2PNode::new(config2).await?;
2662        node2.start().await?;
2663
2664        // Wait a bit for nodes to start listening
2665        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2666
2667        // Get actual listening address of node2
2668        let node2_addr = node2.local_addr().ok_or_else(|| {
2669            P2PError::Network(crate::error::NetworkError::ProtocolError(
2670                "No listening address".to_string().into(),
2671            ))
2672        })?;
2673
2674        // Connect node1 to node2
2675        let channel_id =
2676            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2677                Ok(res) => res?,
2678                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2679            };
2680
2681        // Wait for identity exchange via wait_for_peer_identity.
2682        let target_peer_id = node1
2683            .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2684            .await?;
2685        assert_eq!(target_peer_id, node2.peer_id().clone());
2686
2687        // Send a message
2688        let message_data = b"Hello, peer!".to_vec();
2689        let result = match timeout(
2690            Duration::from_millis(500),
2691            node1.send_message(&target_peer_id, "test-protocol", message_data, &[]),
2692        )
2693        .await
2694        {
2695            Ok(res) => res,
2696            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2697        };
2698        // For now, we'll just check that we don't get a "not connected" error
2699        // The actual send might fail due to no handler on the other side
2700        if let Err(e) = &result {
2701            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2702        }
2703
2704        // Try to send to non-existent peer
2705        let non_existent_peer = PeerId::from_bytes([0xFFu8; 32]);
2706        let result = node1
2707            .send_message(&non_existent_peer, "test-protocol", vec![], &[])
2708            .await;
2709        assert!(result.is_err(), "Sending to non-existent peer should fail");
2710
2711        node1.stop().await?;
2712        node2.stop().await?;
2713
2714        Ok(())
2715    }
2716
2717    #[tokio::test]
2718    async fn test_remote_mcp_operations() -> Result<()> {
2719        let config = create_test_node_config();
2720        let node = P2PNode::new(config).await?;
2721
2722        // MCP removed; test reduced to simple start/stop
2723        node.start().await?;
2724        node.stop().await?;
2725        Ok(())
2726    }
2727
2728    #[tokio::test]
2729    async fn test_health_check() -> Result<()> {
2730        let config = create_test_node_config();
2731        let node = P2PNode::new(config).await?;
2732
2733        // Health check should pass with no connections
2734        let result = node.health_check().await;
2735        assert!(result.is_ok());
2736
2737        // Note: We're not actually connecting to real peers here
2738        // since that would require running bootstrap nodes.
2739        // The health check should still pass with no connections.
2740
2741        Ok(())
2742    }
2743
2744    #[tokio::test]
2745    async fn test_node_uptime() -> Result<()> {
2746        let config = create_test_node_config();
2747        let node = P2PNode::new(config).await?;
2748
2749        let uptime1 = node.uptime();
2750        assert!(uptime1 >= Duration::from_secs(0));
2751
2752        // Wait a bit
2753        tokio::time::sleep(Duration::from_millis(10)).await;
2754
2755        let uptime2 = node.uptime();
2756        assert!(uptime2 > uptime1);
2757
2758        Ok(())
2759    }
2760
2761    #[tokio::test]
2762    async fn test_node_config_access() -> Result<()> {
2763        let config = create_test_node_config();
2764        let node = P2PNode::new(config).await?;
2765
2766        let node_config = node.config();
2767        assert_eq!(node_config.max_connections, 100);
2768        // MCP removed
2769
2770        Ok(())
2771    }
2772
2773    #[tokio::test]
2774    async fn test_mcp_server_access() -> Result<()> {
2775        let config = create_test_node_config();
2776        let _node = P2PNode::new(config).await?;
2777
2778        // MCP removed
2779        Ok(())
2780    }
2781
2782    #[tokio::test]
2783    async fn test_dht_access() -> Result<()> {
2784        let config = create_test_node_config();
2785        let node = P2PNode::new(config).await?;
2786
2787        // DHT is always available
2788        let _dht = node.dht();
2789
2790        Ok(())
2791    }
2792
2793    #[tokio::test]
2794    async fn test_node_config_builder() -> Result<()> {
2795        let bootstrap: MultiAddr = "/ip4/127.0.0.1/udp/9000/quic".parse().unwrap();
2796
2797        let config = NodeConfig::builder()
2798            .local(true)
2799            .ipv6(true)
2800            .bootstrap_peer(bootstrap)
2801            .connection_timeout(Duration::from_secs(15))
2802            .max_connections(200)
2803            .max_message_size(TEST_MAX_MESSAGE_SIZE)
2804            .build()?;
2805
2806        assert_eq!(config.listen_addrs().len(), 2); // IPv4 + IPv6
2807        assert!(config.local);
2808        assert!(config.ipv6);
2809        assert_eq!(config.bootstrap_peers.len(), 1);
2810        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2811        assert_eq!(config.max_connections, 200);
2812        assert_eq!(config.max_message_size, Some(TEST_MAX_MESSAGE_SIZE));
2813        assert!(config.allow_loopback); // auto-enabled by local(true)
2814
2815        Ok(())
2816    }
2817
2818    #[tokio::test]
2819    async fn test_bootstrap_peers() -> Result<()> {
2820        let mut config = create_test_node_config();
2821        config.bootstrap_peers = vec![
2822            crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9200),
2823            crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9201),
2824        ];
2825
2826        let node = P2PNode::new(config).await?;
2827
2828        // Start node (which attempts to connect to bootstrap peers)
2829        node.start().await?;
2830
2831        // In a test environment, bootstrap peers may not be available
2832        // The test verifies the node starts correctly with bootstrap configuration
2833        // Peer count may include local/internal tracking, so we just verify it's reasonable
2834        let _peer_count = node.peer_count().await;
2835
2836        node.stop().await?;
2837        Ok(())
2838    }
2839
2840    #[tokio::test]
2841    async fn test_peer_info_structure() {
2842        let peer_info = PeerInfo {
2843            channel_id: "test_peer".to_string(),
2844            addresses: vec!["/ip4/127.0.0.1/tcp/9000".parse::<MultiAddr>().unwrap()],
2845            connected_at: Instant::now(),
2846            last_seen: Instant::now(),
2847            status: ConnectionStatus::Connected,
2848            protocols: vec!["test-protocol".to_string()],
2849            heartbeat_count: 0,
2850        };
2851
2852        assert_eq!(peer_info.channel_id, "test_peer");
2853        assert_eq!(peer_info.addresses.len(), 1);
2854        assert_eq!(peer_info.status, ConnectionStatus::Connected);
2855        assert_eq!(peer_info.protocols.len(), 1);
2856    }
2857
2858    #[tokio::test]
2859    async fn test_serialization() -> Result<()> {
2860        // Test that configs can be serialized/deserialized
2861        let config = create_test_node_config();
2862        let serialized = serde_json::to_string(&config)?;
2863        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2864
2865        assert_eq!(config.local, deserialized.local);
2866        assert_eq!(config.port, deserialized.port);
2867        assert_eq!(config.ipv6, deserialized.ipv6);
2868        assert_eq!(config.bootstrap_peers, deserialized.bootstrap_peers);
2869
2870        Ok(())
2871    }
2872
2873    #[tokio::test]
2874    async fn test_get_channel_id_by_address_found() -> Result<()> {
2875        let config = create_test_node_config();
2876        let node = P2PNode::new(config).await?;
2877
2878        // Manually insert a peer for testing
2879        let test_channel_id = "peer_test_123".to_string();
2880        let test_address = "192.168.1.100:9000";
2881        let test_multiaddr = MultiAddr::quic(test_address.parse().unwrap());
2882
2883        let peer_info = PeerInfo {
2884            channel_id: test_channel_id.clone(),
2885            addresses: vec![test_multiaddr],
2886            connected_at: Instant::now(),
2887            last_seen: Instant::now(),
2888            status: ConnectionStatus::Connected,
2889            protocols: vec!["test-protocol".to_string()],
2890            heartbeat_count: 0,
2891        };
2892
2893        node.transport
2894            .inject_peer(test_channel_id.clone(), peer_info)
2895            .await;
2896
2897        // Test: Find channel by address
2898        let lookup_addr = MultiAddr::quic(test_address.parse().unwrap());
2899        let found_channel_id = node.get_channel_id_by_address(&lookup_addr).await;
2900        assert_eq!(found_channel_id, Some(test_channel_id));
2901
2902        Ok(())
2903    }
2904
2905    #[tokio::test]
2906    async fn test_get_channel_id_by_address_not_found() -> Result<()> {
2907        let config = create_test_node_config();
2908        let node = P2PNode::new(config).await?;
2909
2910        // Test: Try to find a channel that doesn't exist
2911        let unknown_addr = MultiAddr::quic("192.168.1.200:9000".parse().unwrap());
2912        let result = node.get_channel_id_by_address(&unknown_addr).await;
2913        assert_eq!(result, None);
2914
2915        Ok(())
2916    }
2917
2918    #[tokio::test]
2919    async fn test_get_channel_id_by_address_invalid_format() -> Result<()> {
2920        let config = create_test_node_config();
2921        let node = P2PNode::new(config).await?;
2922
2923        // Test: Non-IP address should return None (no matching socket addr)
2924        let ble_addr = MultiAddr::new(crate::address::TransportAddr::Ble {
2925            mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
2926            psm: 0x0025,
2927        });
2928        let result = node.get_channel_id_by_address(&ble_addr).await;
2929        assert_eq!(result, None);
2930
2931        Ok(())
2932    }
2933
2934    #[tokio::test]
2935    async fn test_get_channel_id_by_address_multiple_peers() -> Result<()> {
2936        let config = create_test_node_config();
2937        let node = P2PNode::new(config).await?;
2938
2939        // Add multiple peers with different addresses
2940        let peer1_id = "peer_1".to_string();
2941        let peer1_addr_str = "192.168.1.101:9001";
2942        let peer1_multiaddr = MultiAddr::quic(peer1_addr_str.parse().unwrap());
2943
2944        let peer2_id = "peer_2".to_string();
2945        let peer2_addr_str = "192.168.1.102:9002";
2946        let peer2_multiaddr = MultiAddr::quic(peer2_addr_str.parse().unwrap());
2947
2948        let peer1_info = PeerInfo {
2949            channel_id: peer1_id.clone(),
2950            addresses: vec![peer1_multiaddr],
2951            connected_at: Instant::now(),
2952            last_seen: Instant::now(),
2953            status: ConnectionStatus::Connected,
2954            protocols: vec!["test-protocol".to_string()],
2955            heartbeat_count: 0,
2956        };
2957
2958        let peer2_info = PeerInfo {
2959            channel_id: peer2_id.clone(),
2960            addresses: vec![peer2_multiaddr],
2961            connected_at: Instant::now(),
2962            last_seen: Instant::now(),
2963            status: ConnectionStatus::Connected,
2964            protocols: vec!["test-protocol".to_string()],
2965            heartbeat_count: 0,
2966        };
2967
2968        node.transport
2969            .inject_peer(peer1_id.clone(), peer1_info)
2970            .await;
2971        node.transport
2972            .inject_peer(peer2_id.clone(), peer2_info)
2973            .await;
2974
2975        // Test: Find each channel by their unique address
2976        let found_peer1 = node
2977            .get_channel_id_by_address(&MultiAddr::quic(peer1_addr_str.parse().unwrap()))
2978            .await;
2979        let found_peer2 = node
2980            .get_channel_id_by_address(&MultiAddr::quic(peer2_addr_str.parse().unwrap()))
2981            .await;
2982
2983        assert_eq!(found_peer1, Some(peer1_id));
2984        assert_eq!(found_peer2, Some(peer2_id));
2985
2986        Ok(())
2987    }
2988
2989    #[tokio::test]
2990    async fn test_list_active_connections_empty() -> Result<()> {
2991        let config = create_test_node_config();
2992        let node = P2PNode::new(config).await?;
2993
2994        // Test: No connections initially
2995        let connections = node.list_active_connections().await;
2996        assert!(connections.is_empty());
2997
2998        Ok(())
2999    }
3000
3001    #[tokio::test]
3002    async fn test_list_active_connections_with_peers() -> Result<()> {
3003        let config = create_test_node_config();
3004        let node = P2PNode::new(config).await?;
3005
3006        // Add multiple peers
3007        let peer1_id = "peer_1".to_string();
3008        let peer1_addrs = vec![
3009            MultiAddr::quic("192.168.1.101:9001".parse().unwrap()),
3010            MultiAddr::quic("192.168.1.101:9002".parse().unwrap()),
3011        ];
3012
3013        let peer2_id = "peer_2".to_string();
3014        let peer2_addrs = vec![MultiAddr::quic("192.168.1.102:9003".parse().unwrap())];
3015
3016        let peer1_info = PeerInfo {
3017            channel_id: peer1_id.clone(),
3018            addresses: peer1_addrs.clone(),
3019            connected_at: Instant::now(),
3020            last_seen: Instant::now(),
3021            status: ConnectionStatus::Connected,
3022            protocols: vec!["test-protocol".to_string()],
3023            heartbeat_count: 0,
3024        };
3025
3026        let peer2_info = PeerInfo {
3027            channel_id: peer2_id.clone(),
3028            addresses: peer2_addrs.clone(),
3029            connected_at: Instant::now(),
3030            last_seen: Instant::now(),
3031            status: ConnectionStatus::Connected,
3032            protocols: vec!["test-protocol".to_string()],
3033            heartbeat_count: 0,
3034        };
3035
3036        node.transport
3037            .inject_peer(peer1_id.clone(), peer1_info)
3038            .await;
3039        node.transport
3040            .inject_peer(peer2_id.clone(), peer2_info)
3041            .await;
3042
3043        // Also add to active_connections (list_active_connections iterates over this)
3044        node.transport
3045            .inject_active_connection(peer1_id.clone())
3046            .await;
3047        node.transport
3048            .inject_active_connection(peer2_id.clone())
3049            .await;
3050
3051        // Test: List all active connections
3052        let connections = node.list_active_connections().await;
3053        assert_eq!(connections.len(), 2);
3054
3055        // Verify peer1 and peer2 are in the list
3056        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3057        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3058
3059        assert!(peer1_conn.is_some());
3060        assert!(peer2_conn.is_some());
3061
3062        // Verify addresses match
3063        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3064        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3065
3066        Ok(())
3067    }
3068
3069    #[tokio::test]
3070    async fn test_remove_channel_success() -> Result<()> {
3071        let config = create_test_node_config();
3072        let node = P2PNode::new(config).await?;
3073
3074        // Add a peer
3075        let channel_id = "peer_to_remove".to_string();
3076        let channel_peer_id = PeerId::from_name(&channel_id);
3077        let peer_info = PeerInfo {
3078            channel_id: channel_id.clone(),
3079            addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
3080            connected_at: Instant::now(),
3081            last_seen: Instant::now(),
3082            status: ConnectionStatus::Connected,
3083            protocols: vec!["test-protocol".to_string()],
3084            heartbeat_count: 0,
3085        };
3086
3087        node.transport
3088            .inject_peer(channel_id.clone(), peer_info)
3089            .await;
3090        node.transport
3091            .inject_peer_to_channel(channel_peer_id, channel_id.clone())
3092            .await;
3093
3094        // Verify peer exists
3095        assert!(node.is_peer_connected(&channel_peer_id).await);
3096
3097        // Remove the channel
3098        let removed = node.remove_channel(&channel_id).await;
3099        assert!(removed);
3100
3101        // Verify peer no longer exists
3102        assert!(!node.is_peer_connected(&channel_peer_id).await);
3103
3104        Ok(())
3105    }
3106
3107    #[tokio::test]
3108    async fn test_remove_channel_nonexistent() -> Result<()> {
3109        let config = create_test_node_config();
3110        let node = P2PNode::new(config).await?;
3111
3112        // Try to remove a channel that doesn't exist
3113        let removed = node.remove_channel("nonexistent_peer").await;
3114        assert!(!removed);
3115
3116        Ok(())
3117    }
3118
3119    #[tokio::test]
3120    async fn test_is_peer_connected() -> Result<()> {
3121        let config = create_test_node_config();
3122        let node = P2PNode::new(config).await?;
3123
3124        let channel_id = "test_peer".to_string();
3125        let channel_peer_id = PeerId::from_name(&channel_id);
3126
3127        // Initially not connected
3128        assert!(!node.is_peer_connected(&channel_peer_id).await);
3129
3130        // Add peer
3131        let peer_info = PeerInfo {
3132            channel_id: channel_id.clone(),
3133            addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
3134            connected_at: Instant::now(),
3135            last_seen: Instant::now(),
3136            status: ConnectionStatus::Connected,
3137            protocols: vec!["test-protocol".to_string()],
3138            heartbeat_count: 0,
3139        };
3140
3141        node.transport
3142            .inject_peer(channel_id.clone(), peer_info)
3143            .await;
3144        node.transport
3145            .inject_peer_to_channel(channel_peer_id, channel_id.clone())
3146            .await;
3147
3148        // Now connected
3149        assert!(node.is_peer_connected(&channel_peer_id).await);
3150
3151        // Remove channel
3152        node.remove_channel(&channel_id).await;
3153
3154        // No longer connected
3155        assert!(!node.is_peer_connected(&channel_peer_id).await);
3156
3157        Ok(())
3158    }
3159
3160    #[test]
3161    fn test_normalize_ipv6_wildcard() {
3162        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3163
3164        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3165        let normalized = normalize_wildcard_to_loopback(wildcard);
3166
3167        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3168        assert_eq!(normalized.port(), 8080);
3169    }
3170
3171    #[test]
3172    fn test_normalize_ipv4_wildcard() {
3173        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3174
3175        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3176        let normalized = normalize_wildcard_to_loopback(wildcard);
3177
3178        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3179        assert_eq!(normalized.port(), 9000);
3180    }
3181
3182    #[test]
3183    fn test_normalize_specific_address_unchanged() {
3184        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3185        let normalized = normalize_wildcard_to_loopback(specific);
3186
3187        assert_eq!(normalized, specific);
3188    }
3189
3190    #[test]
3191    fn test_normalize_loopback_unchanged() {
3192        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3193
3194        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3195        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3196        assert_eq!(normalized_v6, loopback_v6);
3197
3198        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3199        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3200        assert_eq!(normalized_v4, loopback_v4);
3201    }
3202
3203    // ---- parse_protocol_message regression tests ----
3204
3205    /// Get current Unix timestamp for tests
3206    fn current_timestamp() -> u64 {
3207        std::time::SystemTime::now()
3208            .duration_since(std::time::UNIX_EPOCH)
3209            .map(|d| d.as_secs())
3210            .unwrap_or(0)
3211    }
3212
3213    /// Helper to create a postcard-serialized unsigned WireMessage for tests
3214    fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
3215        let msg = WireMessage {
3216            protocol: protocol.to_string(),
3217            data,
3218            from: PeerId::from_name(from),
3219            timestamp,
3220            user_agent: String::new(),
3221            public_key: Vec::new(),
3222            signature: Vec::new(),
3223        };
3224        postcard::to_stdvec(&msg).unwrap()
3225    }
3226
3227    /// Helper to create a postcard-serialized signed WireMessage for tests.
3228    fn make_signed_wire_bytes(
3229        identity: &NodeIdentity,
3230        protocol: &str,
3231        data: Vec<u8>,
3232        timestamp: u64,
3233    ) -> Vec<u8> {
3234        let from = *identity.peer_id();
3235        let user_agent = "test/1.0";
3236        let signable =
3237            postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3238                .unwrap();
3239        let sig = identity.sign(&signable).expect("signing should succeed");
3240        let msg = WireMessage {
3241            protocol: protocol.to_string(),
3242            data,
3243            from,
3244            timestamp,
3245            user_agent: user_agent.to_string(),
3246            public_key: identity.public_key().as_bytes().to_vec(),
3247            signature: sig.as_bytes().to_vec(),
3248        };
3249        postcard::to_stdvec(&msg).unwrap()
3250    }
3251
3252    #[test]
3253    fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
3254        // Regression: For unsigned messages, P2PEvent::Message.source must be the
3255        // transport peer ID, NOT the "from" field from the wire message.
3256        let transport_id = "abcdef0123456789";
3257        let logical_id = "spoofed-logical-id";
3258        let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
3259
3260        let parsed =
3261            parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
3262
3263        // Unsigned message: no authenticated node ID
3264        assert!(parsed.authenticated_node_id.is_none());
3265
3266        match parsed.event {
3267            P2PEvent::Message {
3268                topic,
3269                source,
3270                transport_source,
3271                timestamp: _,
3272                data,
3273            } => {
3274                assert!(source.is_none(), "unsigned message source must be None");
3275                assert!(
3276                    transport_source.is_none(),
3277                    "non-socket transport source should not produce an IP transport address"
3278                );
3279                assert_eq!(topic, "test/v1");
3280                assert_eq!(data, vec![1u8, 2, 3]);
3281            }
3282            other => panic!("expected P2PEvent::Message, got {:?}", other),
3283        }
3284    }
3285
3286    #[test]
3287    fn test_parse_protocol_message_rejects_invalid_bytes() {
3288        // Random bytes that are not valid bincode should be rejected
3289        assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
3290    }
3291
3292    #[test]
3293    fn test_parse_protocol_message_rejects_truncated_message() {
3294        // A truncated bincode message should fail to deserialize
3295        let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
3296        let truncated = &full_bytes[..full_bytes.len() / 2];
3297        assert!(parse_protocol_message(truncated, "peer-id").is_none());
3298    }
3299
3300    #[test]
3301    fn test_parse_protocol_message_empty_payload() {
3302        let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
3303
3304        let parsed = parse_protocol_message(&bytes, "transport-peer")
3305            .expect("valid message with empty data should parse");
3306
3307        match parsed.event {
3308            P2PEvent::Message { data, .. } => assert!(data.is_empty()),
3309            other => panic!("expected P2PEvent::Message, got {:?}", other),
3310        }
3311    }
3312
3313    #[test]
3314    fn test_parse_protocol_message_records_ip_transport_source() {
3315        let bytes = make_wire_bytes("ping", vec![1], "sender", current_timestamp());
3316
3317        let parsed =
3318            parse_protocol_message(&bytes, "192.168.1.2:4567").expect("valid message should parse");
3319
3320        match parsed.event {
3321            P2PEvent::Message {
3322                transport_source, ..
3323            } => {
3324                assert_eq!(
3325                    transport_source,
3326                    Some(MultiAddr::quic("192.168.1.2:4567".parse().unwrap()))
3327                );
3328            }
3329            other => panic!("expected P2PEvent::Message, got {:?}", other),
3330        }
3331    }
3332
3333    #[test]
3334    fn test_parse_protocol_message_preserves_binary_payload() {
3335        // Verify that arbitrary byte values (including 0xFF, 0x00) survive round-trip
3336        let payload: Vec<u8> = (0..=255).collect();
3337        let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
3338
3339        let parsed = parse_protocol_message(&bytes, "peer-id")
3340            .expect("valid message with full byte range should parse");
3341
3342        match parsed.event {
3343            P2PEvent::Message { data, topic, .. } => {
3344                assert_eq!(topic, "binary/v1");
3345                assert_eq!(
3346                    data, payload,
3347                    "payload must survive bincode round-trip exactly"
3348                );
3349            }
3350            other => panic!("expected P2PEvent::Message, got {:?}", other),
3351        }
3352    }
3353
3354    #[test]
3355    fn test_parse_signed_message_verifies_and_uses_node_id() {
3356        let identity = NodeIdentity::generate().expect("should generate identity");
3357        let protocol = "test/signed";
3358        let data: Vec<u8> = vec![10, 20, 30];
3359        // The `from` field must match the PeerId derived from the public key.
3360        let from = *identity.peer_id();
3361        let timestamp = current_timestamp();
3362        let user_agent = "test/1.0";
3363
3364        // Compute signable bytes the same way create_protocol_message does
3365        let signable =
3366            postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3367                .unwrap();
3368        let sig = identity.sign(&signable).expect("signing should succeed");
3369
3370        let msg = WireMessage {
3371            protocol: protocol.to_string(),
3372            data: data.clone(),
3373            from,
3374            timestamp,
3375            user_agent: user_agent.to_string(),
3376            public_key: identity.public_key().as_bytes().to_vec(),
3377            signature: sig.as_bytes().to_vec(),
3378        };
3379        let bytes = postcard::to_stdvec(&msg).unwrap();
3380
3381        let parsed =
3382            parse_protocol_message(&bytes, "transport-xyz").expect("signed message should parse");
3383
3384        let expected_peer_id = *identity.peer_id();
3385        assert_eq!(
3386            parsed.authenticated_node_id.as_ref(),
3387            Some(&expected_peer_id)
3388        );
3389
3390        match parsed.event {
3391            P2PEvent::Message { source, .. } => {
3392                assert_eq!(
3393                    source.as_ref(),
3394                    Some(&expected_peer_id),
3395                    "source should be the verified PeerId"
3396                );
3397            }
3398            other => panic!("expected P2PEvent::Message, got {:?}", other),
3399        }
3400    }
3401
3402    #[test]
3403    fn test_parse_message_with_bad_signature_is_rejected() {
3404        let identity = NodeIdentity::generate().expect("should generate identity");
3405        let protocol = "test/bad-sig";
3406        let data: Vec<u8> = vec![1, 2, 3];
3407        let from = *identity.peer_id();
3408        let timestamp = current_timestamp();
3409        let user_agent = "test/1.0";
3410
3411        // Sign correct signable bytes
3412        let signable =
3413            postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3414                .unwrap();
3415        let sig = identity.sign(&signable).expect("signing should succeed");
3416
3417        // Tamper with the data (signature was over [1,2,3], not [99,99,99])
3418        let msg = WireMessage {
3419            protocol: protocol.to_string(),
3420            data: vec![99, 99, 99],
3421            from,
3422            timestamp,
3423            user_agent: user_agent.to_string(),
3424            public_key: identity.public_key().as_bytes().to_vec(),
3425            signature: sig.as_bytes().to_vec(),
3426        };
3427        let bytes = postcard::to_stdvec(&msg).unwrap();
3428
3429        assert!(
3430            parse_protocol_message(&bytes, "transport-xyz").is_none(),
3431            "message with bad signature should be rejected"
3432        );
3433    }
3434
3435    #[test]
3436    fn test_parse_message_with_mismatched_from_is_rejected() {
3437        let identity = NodeIdentity::generate().expect("should generate identity");
3438        let protocol = "test/from-mismatch";
3439        let data: Vec<u8> = vec![1, 2, 3];
3440        // Use a `from` field that does NOT match the public key's PeerId.
3441        let fake_from = PeerId::from_bytes([0xDE; 32]);
3442        let timestamp = current_timestamp();
3443        let user_agent = "test/1.0";
3444
3445        let signable =
3446            postcard::to_stdvec(&(protocol, data.as_slice(), &fake_from, timestamp, user_agent))
3447                .unwrap();
3448        let sig = identity.sign(&signable).expect("signing should succeed");
3449
3450        let msg = WireMessage {
3451            protocol: protocol.to_string(),
3452            data,
3453            from: fake_from,
3454            timestamp,
3455            user_agent: user_agent.to_string(),
3456            public_key: identity.public_key().as_bytes().to_vec(),
3457            signature: sig.as_bytes().to_vec(),
3458        };
3459        let bytes = postcard::to_stdvec(&msg).unwrap();
3460
3461        assert!(
3462            parse_protocol_message(&bytes, "transport-xyz").is_none(),
3463            "message with mismatched from field should be rejected"
3464        );
3465    }
3466
3467    #[test]
3468    fn test_parse_protocol_message_accepts_arbitrary_timestamps() {
3469        // Clock skew between peers must not drop messages.
3470        // Regression: previously ±5 min tolerance silently rejected all
3471        // traffic when client and node clocks differed.
3472        let payload = vec![1, 2, 3];
3473
3474        // 10 hours in the past
3475        let old_ts = current_timestamp().saturating_sub(36_000);
3476        let old_bytes = make_wire_bytes("test/old", payload.clone(), "sender", old_ts);
3477        assert!(
3478            parse_protocol_message(&old_bytes, "peer-id").is_some(),
3479            "should accept unsigned message with timestamp 10h in the past"
3480        );
3481
3482        // 10 hours in the future
3483        let future_ts = current_timestamp().saturating_add(36_000);
3484        let future_bytes = make_wire_bytes("test/future", payload.clone(), "sender", future_ts);
3485        assert!(
3486            parse_protocol_message(&future_bytes, "peer-id").is_some(),
3487            "should accept unsigned message with timestamp 10h in the future"
3488        );
3489
3490        // Signed messages must take the same path: timestamp remains part of the
3491        // signed bytes for integrity, but is not used for wall-clock rejection.
3492        let identity = NodeIdentity::generate().expect("should generate identity");
3493        let signed_old =
3494            make_signed_wire_bytes(&identity, "test/signed-old", payload.clone(), old_ts);
3495        assert!(
3496            parse_protocol_message(&signed_old, "transport-xyz").is_some(),
3497            "should accept signed message with timestamp 10h in the past"
3498        );
3499
3500        let signed_future =
3501            make_signed_wire_bytes(&identity, "test/signed-future", payload, future_ts);
3502        assert!(
3503            parse_protocol_message(&signed_future, "transport-xyz").is_some(),
3504            "should accept signed message with timestamp 10h in the future"
3505        );
3506    }
3507
3508    #[test]
3509    fn test_parse_protocol_message_exposes_timestamp_on_event() {
3510        // After removing the wall-clock skew gate, the signed timestamp must
3511        // remain reachable on `P2PEvent::Message` so application-layer handlers
3512        // can implement freshness / replay defense.
3513        let ts: u64 = 1_234_567_890;
3514        let bytes = make_wire_bytes("test/ts", vec![9, 9, 9], "sender", ts);
3515        let parsed = parse_protocol_message(&bytes, "peer-id").expect("valid message should parse");
3516        match parsed.event {
3517            P2PEvent::Message { timestamp, .. } => {
3518                assert_eq!(timestamp, ts, "P2PEvent::Message.timestamp must round-trip");
3519            }
3520            other => panic!("expected P2PEvent::Message, got {:?}", other),
3521        }
3522    }
3523
3524    #[test]
3525    fn test_signed_message_timestamp_is_signature_covered() {
3526        // Sign once, mutate only the timestamp, assert rejection. This is the
3527        // only timestamp property still enforced by `parse_protocol_message`
3528        // after the wall-clock gate was removed: signature integrity.
3529        let identity = NodeIdentity::generate().expect("should generate identity");
3530        let ts: u64 = 1_700_000_000;
3531        let signed = make_signed_wire_bytes(&identity, "test/sig", vec![1, 2, 3], ts);
3532
3533        // Sanity: unmodified bytes parse and authenticate.
3534        let parsed = parse_protocol_message(&signed, "transport-xyz")
3535            .expect("unmodified signed message should parse");
3536        assert!(parsed.authenticated_node_id.is_some());
3537
3538        // Now tamper with just the timestamp on the wire and re-serialize.
3539        let mut tampered: WireMessage =
3540            postcard::from_bytes(&signed).expect("signed bytes must deserialize");
3541        tampered.timestamp = ts.wrapping_add(1);
3542        let tampered_bytes = postcard::to_stdvec(&tampered).expect("re-serialize");
3543
3544        assert!(
3545            parse_protocol_message(&tampered_bytes, "transport-xyz").is_none(),
3546            "timestamp-only mutation on a signed message must fail signature verification"
3547        );
3548    }
3549}