Skip to main content

saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::PeerId;
20use crate::adaptive::trust::{TrustRecord, TrustSnapshot};
21use crate::adaptive::{AdaptiveDHT, AdaptiveDhtConfig, TrustEngine, TrustEvent};
22use crate::bootstrap::cache::{CachedCloseGroupPeer, CloseGroupCache};
23use crate::bootstrap::{BootstrapConfig, BootstrapManager};
24use crate::dht::core_engine::AddressType;
25use crate::dht_network_manager::{
26    DhtNetworkConfig, DhtNetworkEvent, DhtNetworkManager, IDENTITY_EXCHANGE_TIMEOUT,
27};
28use crate::error::{IdentityError, NetworkError, P2PError, P2pResult as Result};
29use crate::reachability::spawn_acquisition_driver;
30
31use crate::MultiAddr;
32use crate::identity::node_identity::{NodeIdentity, peer_id_from_public_key};
33use crate::quantum_crypto::saorsa_transport_integration::{MlDsaPublicKey, MlDsaSignature};
34use dashmap::DashMap;
35use futures::StreamExt;
36use parking_lot::Mutex as ParkingMutex;
37use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::net::SocketAddr;
40use std::path::{Path, PathBuf};
41use std::sync::Arc;
42use std::sync::atomic::{AtomicBool, Ordering};
43use std::time::{Duration, SystemTime, UNIX_EPOCH};
44use tokio::sync::{Mutex as TokioMutex, RwLock, broadcast};
45use tokio::time::Instant;
46use tokio_util::sync::CancellationToken;
47use tracing::{debug, info, trace, warn};
48
49/// Wire protocol message format for P2P communication.
50///
51/// Serialized with postcard for compact binary encoding.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub(crate) struct WireMessage {
54    /// Protocol/topic identifier
55    pub(crate) protocol: String,
56    /// Raw payload bytes
57    pub(crate) data: Vec<u8>,
58    /// Sender's peer ID (verified against transport-level identity)
59    pub(crate) from: PeerId,
60    /// Unix timestamp in seconds
61    pub(crate) timestamp: u64,
62    /// User agent string identifying the sender's software.
63    ///
64    /// Convention: `"node/<version>"` for full DHT participants,
65    /// `"client/<version>"` or `"<app>/<version>"` for ephemeral clients.
66    /// Included in the signed bytes — tamper-proof.
67    #[serde(default)]
68    pub(crate) user_agent: String,
69    /// Sender's ML-DSA-65 public key (1952 bytes). Empty if unsigned.
70    #[serde(default)]
71    pub(crate) public_key: Vec<u8>,
72    /// ML-DSA-65 signature over the signable bytes. Empty if unsigned.
73    #[serde(default)]
74    pub(crate) signature: Vec<u8>,
75}
76
77/// Operating mode of a P2P node.
78///
79/// Determines the default user agent and DHT participation behavior.
80/// `Node` peers participate in the DHT routing table; `Client` peers
81/// are treated as ephemeral and excluded from routing.
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
83pub enum NodeMode {
84    /// Full DHT-participant node that maintains routing state and routes messages.
85    #[default]
86    Node,
87    /// Ephemeral client that connects to perform operations without joining the DHT.
88    Client,
89}
90
91/// Internal listen mode controlling which network interfaces the node binds to.
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93enum ListenMode {
94    /// Bind to all interfaces (`0.0.0.0` / `::`).
95    Public,
96    /// Bind to loopback only (`127.0.0.1` / `::1`).
97    Local,
98}
99
100/// Returns the default user agent string for the given mode.
101///
102/// - `Node` → `"node/<saorsa-core-version>"`
103/// - `Client` → `"client/<saorsa-core-version>"`
104pub fn user_agent_for_mode(mode: NodeMode) -> String {
105    let prefix = match mode {
106        NodeMode::Node => "node",
107        NodeMode::Client => "client",
108    };
109    format!("{prefix}/{}", env!("CARGO_PKG_VERSION"))
110}
111
112/// Returns `true` if the user agent identifies a full DHT participant (prefix `"node/"`).
113pub fn is_dht_participant(user_agent: &str) -> bool {
114    user_agent.starts_with("node/")
115}
116
117/// Capacity of the internal channel used by the message receiving system.
118pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
119
120/// Maximum number of concurrent in-flight request/response operations.
121pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
122
123/// Maximum allowed timeout for a single request (5 minutes).
124pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
125
126/// Default listen port for the P2P node.
127const DEFAULT_LISTEN_PORT: u16 = 9000;
128
129/// Default maximum number of concurrent connections.
130const DEFAULT_MAX_CONNECTIONS: usize = 10_000;
131
132/// Default connection timeout in seconds.
133///
134/// The transport adapter keeps each direct Happy Eyeballs attempt short so
135/// DHT lookups can move past offline peers quickly. 25s leaves room for
136/// multi-stage connection strategies and identity exchange while preserving
137/// the historical API default.
138const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 25;
139
140/// Number of cached bootstrap peers to retrieve.
141const BOOTSTRAP_PEER_BATCH_SIZE: usize = 20;
142
143/// Timeout in seconds for waiting on a bootstrap peer's identity exchange.
144///
145/// Tighter than the post-bootstrap budget (`IDENTITY_EXCHANGE_TIMEOUT`,
146/// 5 s) on purpose: bootstrap candidates are unverified and a stuck one
147/// must not be allowed to head-of-line block convergence. 3 s covers
148/// loopback (<100 ms) and direct WAN paths (~1–2 s with one handshake
149/// retry); a relay-tunnelled path with congested ML-DSA verification
150/// can exceed this and will fail identity exchange, but the bootstrap
151/// manager simply moves on to other candidates rather than retrying the
152/// same one.
153///
154/// `wait_for_peer_identity` short-circuits on channel close, so most dead
155/// channels surface in microseconds regardless of this budget.
156const BOOTSTRAP_IDENTITY_TIMEOUT_SECS: u64 = 3;
157
158/// Maximum number of bootstrap peers dialed concurrently in Phase B.
159///
160/// Bounds the fan-out of CLI + cached bootstrap dials so simultaneous
161/// QUIC+PQC handshakes don't spike CPU or saturate the UDP socket. Chosen
162/// low on purpose: each dial runs a full ML-KEM key exchange and ML-DSA
163/// verification, and a cold-start node has no spare compute budget.
164const MAX_CONCURRENT_BOOTSTRAP_DIALS: usize = 4;
165
166/// Number of successful bootstrap connections after which a client-mode
167/// node stops dialing further candidates.
168///
169/// Clients only need enough peers to route their own lookups (α=3 parallel
170/// queries → 6 gives ~2× redundancy) and don't serve the DHT, so a fully
171/// populated close-group buys them nothing. Stopping early cuts cold-start
172/// latency by skipping the tail of slow / dead candidates. Nodes always
173/// dial every candidate so their routing table converges fully.
174const CLIENT_BOOTSTRAP_TARGET: usize = 6;
175
176/// Serde helper — returns `true`.
177const fn default_true() -> bool {
178    true
179}
180
181/// Configuration for a P2P node
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct NodeConfig {
184    /// Bind to loopback only (`127.0.0.1` / `::1`).
185    ///
186    /// When `true`, the node listens on loopback addresses suitable for
187    /// local development and testing. When `false` (the default), the node
188    /// listens on all interfaces (`0.0.0.0` / `::`).
189    #[serde(default)]
190    pub local: bool,
191
192    /// Listen port. `0` means OS-assigned ephemeral port.
193    #[serde(default)]
194    pub port: u16,
195
196    /// Enable IPv6 dual-stack binding.
197    ///
198    /// When `true` (the default), both an IPv4 and an IPv6 address are
199    /// bound. When `false`, only IPv4 is used.
200    #[serde(default = "default_true")]
201    pub ipv6: bool,
202
203    /// Bootstrap peers to connect to on startup.
204    pub bootstrap_peers: Vec<crate::MultiAddr>,
205
206    // MCP removed; will be redesigned later
207    /// Connection timeout duration
208    pub connection_timeout: Duration,
209
210    /// Maximum number of concurrent connections
211    pub max_connections: usize,
212
213    /// DHT configuration
214    pub dht_config: DHTConfig,
215
216    /// Bootstrap cache configuration
217    pub bootstrap_cache_config: Option<BootstrapConfig>,
218
219    /// Optional IP diversity configuration for Sybil protection tuning.
220    ///
221    /// When set, this configuration is used by bootstrap peer discovery and
222    /// other diversity-enforcing subsystems. If `None`, defaults are used.
223    pub diversity_config: Option<crate::security::IPDiversityConfig>,
224
225    /// Optional override for the maximum application-layer message size.
226    ///
227    /// When `None`, the underlying saorsa-transport default is used.
228    #[serde(default)]
229    pub max_message_size: Option<usize>,
230
231    /// Optional node identity for app-level message signing.
232    ///
233    /// When set, outgoing messages are signed with the node's ML-DSA-65 key
234    /// and incoming signed messages are verified at the transport layer.
235    #[serde(skip)]
236    pub node_identity: Option<Arc<NodeIdentity>>,
237
238    /// Operating mode of this node.
239    ///
240    /// Determines the default user agent and DHT participation:
241    /// - `Node` → user agent `"node/<version>"`, added to DHT routing tables.
242    /// - `Client` → user agent `"client/<version>"`, treated as ephemeral.
243    #[serde(default)]
244    pub mode: NodeMode,
245
246    /// Optional custom user agent override.
247    ///
248    /// When `Some`, this value is used instead of the mode-derived default.
249    /// When `None`, the user agent is derived from [`NodeConfig::mode`].
250    #[serde(default, skip_serializing_if = "Option::is_none")]
251    pub custom_user_agent: Option<String>,
252
253    /// Allow loopback addresses (127.0.0.1, ::1) in the transport layer.
254    ///
255    /// In production, loopback addresses are rejected because they are not
256    /// routable. Enable this for local devnets and testnets where all nodes
257    /// run on the same machine.
258    ///
259    /// Default: `false`
260    #[serde(default)]
261    pub allow_loopback: bool,
262
263    /// Adaptive DHT configuration (trust-based swap-out).
264    ///
265    /// Controls whether peers with low trust scores are eligible for
266    /// swap-out from the routing table when better candidates arrive. Use
267    /// `NodeConfigBuilder::trust_enforcement` for a simple on/off toggle.
268    ///
269    /// Default: enabled with a swap threshold of 0.35.
270    #[serde(default)]
271    pub adaptive_dht_config: AdaptiveDhtConfig,
272
273    /// Optional path for persisting the close group cache.
274    ///
275    /// Directory for persisting the close group cache.
276    ///
277    /// When set, the node saves its close group peers and their trust
278    /// scores to `{dir}/close_group_cache.json` on shutdown and after
279    /// bootstrap. On startup, cached peers are loaded and contacted
280    /// first, preserving close group consistency across restarts.
281    ///
282    /// When `None`, no close group cache is used.
283    #[serde(default, skip_serializing_if = "Option::is_none")]
284    pub close_group_cache_dir: Option<PathBuf>,
285}
286
287/// DHT-specific configuration
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct DHTConfig {
290    /// Kademlia K parameter (bucket size)
291    pub k_value: usize,
292
293    /// Kademlia alpha parameter (parallelism)
294    pub alpha_value: usize,
295
296    /// DHT refresh interval
297    pub refresh_interval: Duration,
298}
299
300// ============================================================================
301// Address Construction Helpers
302// ============================================================================
303
304/// Build QUIC listen addresses based on port, IPv6 preference, and listen mode.
305///
306/// All returned addresses use the QUIC transport — the only transport
307/// currently supported for dialing. When additional transports are added,
308/// extend this function to produce addresses for those transports as well.
309///
310/// `ListenMode::Public` uses unspecified (all-interface) addresses;
311/// `ListenMode::Local` uses loopback addresses.
312#[inline]
313fn build_listen_addrs(port: u16, ipv6_enabled: bool, mode: ListenMode) -> Vec<MultiAddr> {
314    let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
315
316    let (v4, v6) = match mode {
317        ListenMode::Public => (
318            std::net::Ipv4Addr::UNSPECIFIED,
319            std::net::Ipv6Addr::UNSPECIFIED,
320        ),
321        ListenMode::Local => (std::net::Ipv4Addr::LOCALHOST, std::net::Ipv6Addr::LOCALHOST),
322    };
323
324    if ipv6_enabled {
325        addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
326            std::net::IpAddr::V6(v6),
327            port,
328        )));
329    }
330
331    addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
332        std::net::IpAddr::V4(v4),
333        port,
334    )));
335
336    addrs
337}
338
339impl NodeConfig {
340    /// Returns the effective user agent string.
341    ///
342    /// If a custom user agent was set, returns that. Otherwise, derives
343    /// the user agent from the node's [`NodeMode`].
344    pub fn user_agent(&self) -> String {
345        self.custom_user_agent
346            .clone()
347            .unwrap_or_else(|| user_agent_for_mode(self.mode))
348    }
349
350    /// Compute the listen addresses from the configuration fields.
351    ///
352    /// The returned addresses are derived from [`local`](Self::local),
353    /// [`port`](Self::port), and [`ipv6`](Self::ipv6).
354    pub fn listen_addrs(&self) -> Vec<MultiAddr> {
355        let mode = if self.local {
356            ListenMode::Local
357        } else {
358            ListenMode::Public
359        };
360        build_listen_addrs(self.port, self.ipv6, mode)
361    }
362
363    /// Create a new NodeConfig with default values
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if default addresses cannot be parsed
368    pub fn new() -> Result<Self> {
369        Ok(Self::default())
370    }
371
372    /// Create a builder for customized NodeConfig construction
373    pub fn builder() -> NodeConfigBuilder {
374        NodeConfigBuilder::default()
375    }
376}
377
378// ============================================================================
379// NodeConfig Builder Pattern
380// ============================================================================
381
382/// Builder for constructing [`NodeConfig`] with a transport-aware fluent API.
383///
384/// Defaults are chosen for quick local development:
385/// - QUIC on a random free port (`0`)
386/// - IPv6 enabled (dual-stack)
387/// - All interfaces (not local-only)
388///
389/// # Examples
390///
391/// ```rust,ignore
392/// // Simplest — QUIC on random port, IPv6 on, all interfaces
393/// let config = NodeConfig::builder().build()?;
394///
395/// // Local dev/test mode (loopback, auto-enables allow_loopback)
396/// let config = NodeConfig::builder()
397///     .local(true)
398///     .build()?;
399/// ```
400#[derive(Debug, Clone)]
401pub struct NodeConfigBuilder {
402    port: u16,
403    ipv6: bool,
404    local: bool,
405    bootstrap_peers: Vec<crate::MultiAddr>,
406    max_connections: Option<usize>,
407    connection_timeout: Option<Duration>,
408    dht_config: Option<DHTConfig>,
409    max_message_size: Option<usize>,
410    mode: NodeMode,
411    custom_user_agent: Option<String>,
412    allow_loopback: Option<bool>,
413    adaptive_dht_config: Option<AdaptiveDhtConfig>,
414    close_group_cache_dir: Option<PathBuf>,
415}
416
417impl Default for NodeConfigBuilder {
418    fn default() -> Self {
419        Self {
420            port: 0,
421            ipv6: true,
422            local: false,
423            bootstrap_peers: Vec::new(),
424            max_connections: None,
425            connection_timeout: None,
426            dht_config: None,
427            max_message_size: None,
428            mode: NodeMode::default(),
429            custom_user_agent: None,
430            allow_loopback: None,
431            adaptive_dht_config: None,
432            close_group_cache_dir: None,
433        }
434    }
435}
436
437impl NodeConfigBuilder {
438    /// Set the listen port. Default: `0` (random free port).
439    pub fn port(mut self, port: u16) -> Self {
440        self.port = port;
441        self
442    }
443
444    /// Enable or disable IPv6 dual-stack. Default: `true`.
445    pub fn ipv6(mut self, enabled: bool) -> Self {
446        self.ipv6 = enabled;
447        self
448    }
449
450    /// Bind to loopback only (`true`) or all interfaces (`false`).
451    ///
452    /// When `true`, automatically enables `allow_loopback` unless explicitly
453    /// overridden via [`Self::allow_loopback`].
454    ///
455    /// Default: `false` (all interfaces).
456    pub fn local(mut self, local: bool) -> Self {
457        self.local = local;
458        self
459    }
460
461    /// Add a bootstrap peer.
462    pub fn bootstrap_peer(mut self, addr: crate::MultiAddr) -> Self {
463        self.bootstrap_peers.push(addr);
464        self
465    }
466
467    /// Set maximum connections.
468    pub fn max_connections(mut self, max: usize) -> Self {
469        self.max_connections = Some(max);
470        self
471    }
472
473    /// Set connection timeout.
474    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
475        self.connection_timeout = Some(timeout);
476        self
477    }
478
479    /// Set DHT configuration.
480    pub fn dht_config(mut self, config: DHTConfig) -> Self {
481        self.dht_config = Some(config);
482        self
483    }
484
485    /// Set maximum application-layer message size in bytes.
486    ///
487    /// If this method is not called, saorsa-transport's built-in default is used.
488    pub fn max_message_size(mut self, max_message_size: usize) -> Self {
489        self.max_message_size = Some(max_message_size);
490        self
491    }
492
493    /// Set the operating mode (Node or Client).
494    pub fn mode(mut self, mode: NodeMode) -> Self {
495        self.mode = mode;
496        self
497    }
498
499    /// Set a custom user agent string, overriding the mode-derived default.
500    pub fn custom_user_agent(mut self, user_agent: impl Into<String>) -> Self {
501        self.custom_user_agent = Some(user_agent.into());
502        self
503    }
504
505    /// Explicitly control whether loopback addresses are allowed in the
506    /// transport layer. When not called, `local(true)` auto-enables this;
507    /// `local(false)` defaults to `false`.
508    pub fn allow_loopback(mut self, allow: bool) -> Self {
509        self.allow_loopback = Some(allow);
510        self
511    }
512
513    /// Enable or disable trust-based peer swap-out.
514    ///
515    /// When `false`, peers are never swapped out of the routing table
516    /// based on trust scores. Trust scores are still tracked but have
517    /// no enforcement effect.
518    ///
519    /// When `true` (the default), peers whose trust score falls below the
520    /// swap threshold (0.35) become eligible for replacement when a
521    /// better candidate arrives.
522    ///
523    /// For fine-grained control over the threshold, use
524    /// [`adaptive_dht_config`](Self::adaptive_dht_config) instead.
525    pub fn trust_enforcement(mut self, enabled: bool) -> Self {
526        let threshold = if enabled {
527            AdaptiveDhtConfig::default().swap_threshold
528        } else {
529            0.0
530        };
531        self.adaptive_dht_config = Some(AdaptiveDhtConfig {
532            swap_threshold: threshold,
533        });
534        self
535    }
536
537    /// Set the full adaptive DHT configuration.
538    ///
539    /// Overrides any previous call to [`trust_enforcement`](Self::trust_enforcement).
540    pub fn adaptive_dht_config(mut self, config: AdaptiveDhtConfig) -> Self {
541        self.adaptive_dht_config = Some(config);
542        self
543    }
544
545    /// Set the directory for persisting the close group cache.
546    ///
547    /// The node writes `close_group_cache.json` inside this directory on
548    /// shutdown and after bootstrap, and loads it on startup.
549    pub fn close_group_cache_dir(mut self, path: impl Into<PathBuf>) -> Self {
550        self.close_group_cache_dir = Some(path.into());
551        self
552    }
553
554    /// Build the [`NodeConfig`].
555    ///
556    /// # Errors
557    ///
558    /// Returns an error if address construction fails.
559    pub fn build(self) -> Result<NodeConfig> {
560        // local mode auto-enables allow_loopback unless explicitly overridden
561        let allow_loopback = self.allow_loopback.unwrap_or(self.local);
562
563        Ok(NodeConfig {
564            local: self.local,
565            port: self.port,
566            ipv6: self.ipv6,
567            bootstrap_peers: self.bootstrap_peers,
568            connection_timeout: self
569                .connection_timeout
570                .unwrap_or(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS)),
571            max_connections: self.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
572            dht_config: self.dht_config.unwrap_or_default(),
573            bootstrap_cache_config: None,
574            diversity_config: None,
575            max_message_size: self.max_message_size,
576            node_identity: None,
577            mode: self.mode,
578            custom_user_agent: self.custom_user_agent,
579            allow_loopback,
580            adaptive_dht_config: self.adaptive_dht_config.unwrap_or_default(),
581            close_group_cache_dir: self.close_group_cache_dir,
582        })
583    }
584}
585
586impl Default for NodeConfig {
587    fn default() -> Self {
588        Self {
589            local: false,
590            port: DEFAULT_LISTEN_PORT,
591            ipv6: true,
592            bootstrap_peers: Vec::new(),
593            connection_timeout: Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS),
594            max_connections: DEFAULT_MAX_CONNECTIONS,
595            dht_config: DHTConfig::default(),
596            bootstrap_cache_config: None,
597            diversity_config: None,
598            max_message_size: None,
599            node_identity: None,
600            mode: NodeMode::default(),
601            custom_user_agent: None,
602            allow_loopback: false,
603            adaptive_dht_config: AdaptiveDhtConfig::default(),
604            close_group_cache_dir: None,
605        }
606    }
607}
608
609impl DHTConfig {
610    /// Default K value (bucket size) for Kademlia routing.
611    pub const DEFAULT_K_VALUE: usize = 20;
612    const DEFAULT_ALPHA_VALUE: usize = 3;
613    const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
614    /// Minimum k_value — values below this produce degenerate routing behavior.
615    const MIN_K_VALUE: usize = 4;
616
617    /// Validate parameter safety constraints (Section 4 points 1-13).
618    ///
619    /// Returns `Err` if any constraint is violated.
620    pub fn validate(&self) -> Result<()> {
621        if self.k_value < Self::MIN_K_VALUE {
622            return Err(P2PError::Validation(
623                format!(
624                    "k_value must be >= {} (got {}), values below {} produce degenerate behavior",
625                    Self::MIN_K_VALUE,
626                    self.k_value,
627                    Self::MIN_K_VALUE,
628                )
629                .into(),
630            ));
631        }
632        if self.alpha_value < 1 {
633            return Err(P2PError::Validation(
634                format!("alpha_value must be >= 1 (got {})", self.alpha_value).into(),
635            ));
636        }
637        if self.refresh_interval.is_zero() {
638            return Err(P2PError::Validation("refresh_interval must be > 0".into()));
639        }
640        Ok(())
641    }
642}
643
644impl Default for DHTConfig {
645    fn default() -> Self {
646        Self {
647            k_value: Self::DEFAULT_K_VALUE,
648            alpha_value: Self::DEFAULT_ALPHA_VALUE,
649            refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
650        }
651    }
652}
653
654/// Information about a connected peer
655#[derive(Debug, Clone)]
656pub struct PeerInfo {
657    /// Transport-level channel identifier (internal use only).
658    #[allow(dead_code)]
659    pub(crate) channel_id: String,
660
661    /// Peer's addresses
662    pub addresses: Vec<MultiAddr>,
663
664    /// Connection timestamp
665    pub connected_at: Instant,
666
667    /// Last seen timestamp
668    pub last_seen: Instant,
669
670    /// Connection status
671    pub status: ConnectionStatus,
672
673    /// Supported protocols
674    pub protocols: Vec<String>,
675
676    /// Number of heartbeats received
677    pub heartbeat_count: u64,
678}
679
680/// Connection status for a peer
681#[derive(Debug, Clone, PartialEq)]
682pub enum ConnectionStatus {
683    /// Connection is being established
684    Connecting,
685    /// Connection is established and active
686    Connected,
687    /// Connection is being closed
688    Disconnecting,
689    /// Connection is closed
690    Disconnected,
691    /// Connection failed
692    Failed(String),
693}
694
695/// Network events that can occur in the P2P system
696///
697/// Events are broadcast to all listeners and provide real-time
698/// notifications of network state changes and message arrivals.
699#[derive(Debug, Clone)]
700pub enum P2PEvent {
701    /// Message received from a peer on a specific topic
702    Message {
703        /// Topic or channel the message was sent on
704        topic: String,
705        /// For signed messages this is the authenticated app-level [`PeerId`];
706        /// `None` for unsigned messages.
707        source: Option<PeerId>,
708        /// IP transport address that delivered this message, when known.
709        ///
710        /// This is provenance metadata, not an identity signal.
711        transport_source: Option<MultiAddr>,
712        /// Raw message data payload
713        data: Vec<u8>,
714    },
715    /// An authenticated peer has connected (first signed message verified on any channel).
716    /// The `user_agent` identifies the remote software (e.g. `"node/0.12.1"`, `"client/1.0"`).
717    PeerConnected(PeerId, String),
718    /// An authenticated peer has fully disconnected (all channels closed).
719    PeerDisconnected(PeerId),
720}
721
722/// Response from a peer to a request sent via [`P2PNode::send_request`].
723///
724/// Contains the response payload along with metadata about the responder
725/// and round-trip latency.
726#[derive(Debug, Clone)]
727pub struct PeerResponse {
728    /// The peer that sent the response.
729    pub peer_id: PeerId,
730    /// Raw response payload bytes.
731    pub data: Vec<u8>,
732    /// Round-trip latency from request to response.
733    pub latency: Duration,
734}
735
736/// Wire format for request/response correlation.
737///
738/// Wraps application payloads with a message ID and direction flag
739/// so the receive loop can route responses back to waiting callers.
740#[derive(Debug, Clone, Serialize, Deserialize)]
741pub(crate) struct RequestResponseEnvelope {
742    /// Unique identifier to correlate request ↔ response.
743    pub(crate) message_id: String,
744    /// `false` for requests, `true` for responses.
745    pub(crate) is_response: bool,
746    /// Application payload.
747    pub(crate) payload: Vec<u8>,
748}
749
750/// An in-flight request awaiting a response from a specific peer.
751pub(crate) struct PendingRequest {
752    /// Oneshot sender for delivering the response payload.
753    pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
754    /// The peer we expect the response from (for origin validation).
755    pub(crate) expected_peer: PeerId,
756}
757
758/// Short grace period after closing stale QUIC connections before re-dialing.
759///
760/// `disconnect_channel` is async and waits for the QUIC close, but the
761/// transport endpoint may need a moment to fully release internal state.
762/// Only applied when stale channels were actually disconnected.
763const QUIC_TEARDOWN_GRACE: Duration = Duration::from_millis(100);
764
765/// Main P2P network node that manages connections, routing, and communication
766///
767/// This struct represents a complete P2P network participant that can:
768/// - Connect to other peers via QUIC transport
769/// - Participate in distributed hash table (DHT) operations
770/// - Send and receive messages through various protocols
771/// - Handle network events and peer lifecycle
772///
773/// Transport concerns (connections, messaging, events) are delegated to
774/// `TransportHandle`.
775pub struct P2PNode {
776    /// Node configuration
777    config: NodeConfig,
778
779    /// Our peer ID
780    peer_id: PeerId,
781
782    /// Transport handle owning all QUIC / peer / event state
783    transport: Arc<crate::transport_handle::TransportHandle>,
784
785    /// Node start time
786    start_time: Instant,
787
788    /// Shutdown token — cancelled when the node should stop
789    shutdown: CancellationToken,
790
791    /// Adaptive DHT layer — owns both the DHT manager and the trust engine.
792    /// All DHT operations and trust signals go through this component.
793    adaptive_dht: AdaptiveDHT,
794
795    /// Bootstrap cache manager for peer discovery
796    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
797
798    /// Bootstrap state tracking - indicates whether peer discovery has completed
799    is_bootstrapped: Arc<AtomicBool>,
800
801    /// Whether `start()` has been called (and `stop()` has not yet completed)
802    is_started: Arc<AtomicBool>,
803
804    /// Per-peer locks that serialise reconnect attempts so concurrent sends
805    /// to the same stale peer don't race to dial.  Entries accumulate over
806    /// the node's lifetime; each is a lightweight `Arc<TokioMutex<()>>`.
807    reconnect_locks: ParkingMutex<HashMap<PeerId, Arc<TokioMutex<()>>>>,
808
809    /// The peer ID of the node currently relaying traffic for us (ADR-014).
810    ///
811    /// Set after the reachability classifier acquires a relay in `start()`.
812    /// The relayer monitor watches this against the K-closest set: if the
813    /// relayer drops out, it triggers rebinding.
814    ///
815    /// `None` when the node is publicly reachable (no relay needed) or
816    /// before classification has run.
817    relayer_peer_id: Arc<RwLock<Option<PeerId>>>,
818
819    /// The relay-allocated public address (ADR-014).
820    ///
821    /// Set after a proactive MASQUE relay is acquired in `start()`. This is
822    /// the address that external peers must dial to reach this node through
823    /// the relay. `None` when the node is publicly reachable (no relay) or
824    /// before classification has run.
825    relay_address: Arc<RwLock<Option<SocketAddr>>>,
826}
827
828/// Normalize wildcard bind addresses to localhost loopback addresses
829///
830/// saorsa-transport correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
831/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
832///
833/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
834/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
835/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
836/// - All other addresses pass through unchanged
837///
838/// # Arguments
839/// * `addr` - The SocketAddr to normalize
840///
841/// # Returns
842/// * Normalized SocketAddr suitable for remote connections
843pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
844    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
845
846    if addr.ip().is_unspecified() {
847        // Convert unspecified addresses to loopback
848        let loopback_ip = match addr {
849            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
850            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
851        };
852        std::net::SocketAddr::new(loopback_ip, addr.port())
853    } else {
854        // Not a wildcard address, pass through unchanged
855        addr
856    }
857}
858
859impl P2PNode {
860    /// Create a new P2P node with the given configuration
861    pub async fn new(config: NodeConfig) -> Result<Self> {
862        // Ensure a cryptographic identity exists — generate one if not provided.
863        let node_identity = match config.node_identity.clone() {
864            Some(identity) => identity,
865            None => Arc::new(NodeIdentity::generate()?),
866        };
867
868        // Derive the canonical peer ID from the cryptographic identity.
869        let peer_id = *node_identity.peer_id();
870
871        // Validate parameter safety constraints (Section 4 points 1-13).
872        // Reject invalid config early, before any resources are allocated.
873        config.dht_config.validate()?;
874        if let Some(ref diversity) = config.diversity_config {
875            diversity
876                .validate()
877                .map_err(|e| P2PError::Validation(format!("IP diversity config: {e}").into()))?;
878        }
879
880        // Initialize bootstrap cache manager
881        let bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
882        let bootstrap_manager =
883            match BootstrapManager::with_node_config(bootstrap_config, &config).await {
884                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
885                Err(e) => {
886                    warn!("Failed to initialize bootstrap manager: {e}, continuing without cache");
887                    None
888                }
889            };
890
891        // Build transport handle with all transport-level concerns
892        let transport_config = crate::transport_handle::TransportConfig::from_node_config(
893            &config,
894            crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
895            node_identity.clone(),
896        );
897        let transport =
898            Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
899
900        // Initialize AdaptiveDHT — creates the trust engine and DHT manager
901        let dht_manager_config = DhtNetworkConfig {
902            peer_id,
903            node_config: config.clone(),
904            request_timeout: config.connection_timeout,
905            max_concurrent_operations: MAX_ACTIVE_REQUESTS,
906            enable_security: true,
907            swap_threshold: 0.0, // Set by AdaptiveDHT::new() from AdaptiveDhtConfig
908        };
909        let adaptive_dht = AdaptiveDHT::new(
910            transport.clone(),
911            dht_manager_config,
912            config.adaptive_dht_config.clone(),
913        )
914        .await?;
915
916        let node = Self {
917            config,
918            peer_id,
919            transport,
920            start_time: Instant::now(),
921            shutdown: CancellationToken::new(),
922            adaptive_dht,
923            bootstrap_manager,
924            is_bootstrapped: Arc::new(AtomicBool::new(false)),
925            is_started: Arc::new(AtomicBool::new(false)),
926            reconnect_locks: ParkingMutex::new(HashMap::new()),
927            relayer_peer_id: Arc::new(RwLock::new(None)),
928            relay_address: Arc::new(RwLock::new(None)),
929        };
930        info!(
931            "Created P2P node with peer ID: {} (call start() to begin networking)",
932            node.peer_id
933        );
934
935        Ok(node)
936    }
937
938    /// Get the peer ID of this node.
939    pub fn peer_id(&self) -> &PeerId {
940        &self.peer_id
941    }
942
943    /// Get the transport handle for sharing with other components.
944    pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
945        &self.transport
946    }
947
948    /// The relay-allocated public address, if this node acquired a MASQUE relay.
949    ///
950    /// Returns `Some(addr)` when the node is behind NAT and successfully
951    /// acquired a proactive relay during `start()`. External peers must dial
952    /// this address to reach the node through the relay. Returns `None` when
953    /// the node is publicly reachable or no relay was established.
954    pub async fn relay_address(&self) -> Option<SocketAddr> {
955        *self.relay_address.read().await
956    }
957
958    pub fn local_addr(&self) -> Option<MultiAddr> {
959        self.transport.local_addr()
960    }
961
962    /// Check if the node has completed the initial bootstrap process
963    ///
964    /// Returns `true` if the node has successfully connected to at least one
965    /// bootstrap peer and performed peer discovery (FIND_NODE).
966    pub fn is_bootstrapped(&self) -> bool {
967        self.is_bootstrapped.load(Ordering::SeqCst)
968    }
969
970    /// Manually trigger re-bootstrap (useful for recovery or network rejoin)
971    ///
972    /// This clears the bootstrapped state and attempts to reconnect to
973    /// bootstrap peers and discover new peers.
974    pub async fn re_bootstrap(&self) -> Result<()> {
975        self.is_bootstrapped.store(false, Ordering::SeqCst);
976        self.connect_bootstrap_peers(None).await
977    }
978
979    // =========================================================================
980    // Trust API — delegates to AdaptiveDHT
981    // =========================================================================
982
983    /// Get the trust engine for advanced use cases
984    pub fn trust_engine(&self) -> Arc<TrustEngine> {
985        self.adaptive_dht.trust_engine().clone()
986    }
987
988    /// Report a trust event for a peer.
989    ///
990    /// Core only records penalties (connection failures). Positive trust
991    /// signals are the consumer's responsibility via [`TrustEvent::ApplicationSuccess`].
992    ///
993    /// # Example
994    ///
995    /// ```rust,ignore
996    /// use saorsa_core::adaptive::TrustEvent;
997    ///
998    /// node.report_trust_event(&peer_id, TrustEvent::ApplicationSuccess(1.0)).await;
999    /// node.report_trust_event(&peer_id, TrustEvent::ConnectionFailed).await;
1000    /// ```
1001    pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
1002        self.adaptive_dht.report_trust_event(peer_id, event).await;
1003    }
1004
1005    /// Get the current trust score for a peer (0.0 to 1.0).
1006    ///
1007    /// Returns 0.5 (neutral) for unknown peers.
1008    pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
1009        self.adaptive_dht.peer_trust(peer_id)
1010    }
1011
1012    /// Get the AdaptiveDHT component for direct access
1013    pub fn adaptive_dht(&self) -> &AdaptiveDHT {
1014        &self.adaptive_dht
1015    }
1016
1017    // =========================================================================
1018    // Request/Response API — Automatic Trust Feedback
1019    // =========================================================================
1020
1021    /// Send a request to a peer and wait for a response with automatic trust penalty reporting.
1022    ///
1023    /// Unlike fire-and-forget `send_message()`, this method:
1024    /// 1. Wraps the payload in a `RequestResponseEnvelope` with a unique message ID
1025    /// 2. Sends it on the `/rr/<protocol>` protocol prefix
1026    /// 3. Waits for a matching response (or timeout)
1027    /// 4. Automatically reports failure to the trust engine (success is the expected baseline)
1028    ///
1029    /// The remote peer's handler should call `send_response()` with the
1030    /// incoming message ID to route the response back.
1031    ///
1032    /// # Arguments
1033    ///
1034    /// * `peer_id` - Target peer
1035    /// * `protocol` - Application protocol name (e.g. `"peer_info"`)
1036    /// * `data` - Request payload bytes
1037    /// * `timeout` - Maximum time to wait for a response
1038    ///
1039    /// # Returns
1040    ///
1041    /// A `PeerResponse` on success, or an error on timeout / connection failure.
1042    ///
1043    /// # Example
1044    ///
1045    /// ```rust,ignore
1046    /// let response = node.send_request(&peer_id, "peer_info", request_data, Duration::from_secs(10)).await?;
1047    /// println!("Got {} bytes from {}", response.data.len(), response.peer_id);
1048    /// ```
1049    pub async fn send_request(
1050        &self,
1051        peer_id: &PeerId,
1052        protocol: &str,
1053        data: Vec<u8>,
1054        timeout: Duration,
1055    ) -> Result<PeerResponse> {
1056        match self
1057            .transport
1058            .send_request(peer_id, protocol, data, timeout)
1059            .await
1060        {
1061            Ok(resp) => Ok(resp),
1062            Err(e) => {
1063                let event = if matches!(&e, P2PError::Timeout(_)) {
1064                    TrustEvent::ConnectionTimeout
1065                } else {
1066                    TrustEvent::ConnectionFailed
1067                };
1068                self.report_trust_event(peer_id, event).await;
1069                Err(e)
1070            }
1071        }
1072    }
1073
1074    pub async fn send_response(
1075        &self,
1076        peer_id: &PeerId,
1077        protocol: &str,
1078        message_id: &str,
1079        data: Vec<u8>,
1080    ) -> Result<()> {
1081        self.transport
1082            .send_response(peer_id, protocol, message_id, data)
1083            .await
1084    }
1085
1086    pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
1087        crate::transport_handle::TransportHandle::parse_request_envelope(data)
1088    }
1089
1090    pub async fn subscribe(&self, topic: &str) -> Result<()> {
1091        self.transport.subscribe(topic).await
1092    }
1093
1094    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1095        self.transport.publish(topic, data).await
1096    }
1097
1098    /// Get the node configuration
1099    pub fn config(&self) -> &NodeConfig {
1100        &self.config
1101    }
1102
1103    /// Start the P2P node
1104    pub async fn start(&self) -> Result<()> {
1105        info!("Starting P2P node...");
1106
1107        // Start bootstrap manager background tasks
1108        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1109            let mut manager = bootstrap_manager.write().await;
1110            manager
1111                .start_maintenance()
1112                .map_err(|e| protocol_error(format!("Failed to start bootstrap manager: {e}")))?;
1113            info!("Bootstrap cache manager started");
1114        }
1115
1116        // Start transport listeners and message receiving
1117        self.transport.start_network_listeners().await?;
1118
1119        // Start the adaptive DHT layer (DHT manager + trust engine)
1120        self.adaptive_dht.start().await?;
1121
1122        // Log current listen addresses
1123        let listen_addrs = self.transport.listen_addrs().await;
1124        info!("P2P node started on addresses: {:?}", listen_addrs);
1125
1126        // NOTE: Message receiving is now integrated into the accept loop in start_network_listeners()
1127        // The old start_message_receiving_system() is no longer needed as it competed with the accept
1128        // loop for incoming connections, causing messages to be lost.
1129
1130        // Load close group cache and import trust scores before connecting to peers.
1131        // This ensures trust scores are available when peers are added to the routing table.
1132        let close_group_cache = if let Some(ref dir) = self.config.close_group_cache_dir {
1133            match CloseGroupCache::load_from_dir(dir).await {
1134                Ok(Some(cache)) => {
1135                    // Filter out peers with non-finite trust scores (NaN/Inf)
1136                    // that could corrupt trust engine state or sort ordering.
1137                    let original_count = cache.peers.len();
1138                    let cache = CloseGroupCache {
1139                        peers: cache
1140                            .peers
1141                            .into_iter()
1142                            .filter(|p| p.trust.score.is_finite())
1143                            .collect(),
1144                        ..cache
1145                    };
1146                    let filtered_count = original_count - cache.peers.len();
1147                    if filtered_count > 0 {
1148                        warn!(
1149                            "Filtered {filtered_count} peers with non-finite trust scores from close group cache"
1150                        );
1151                    }
1152
1153                    let trust_snapshot = TrustSnapshot {
1154                        peers: cache
1155                            .peers
1156                            .iter()
1157                            .map(|p| (p.peer_id, p.trust.clone()))
1158                            .collect(),
1159                    };
1160                    self.adaptive_dht
1161                        .trust_engine()
1162                        .import_snapshot(&trust_snapshot);
1163                    info!(
1164                        "Loaded {} peers from close group cache (trust scores imported)",
1165                        cache.peers.len()
1166                    );
1167                    Some(cache)
1168                }
1169                Ok(None) => {
1170                    debug!(
1171                        "No close group cache found in {}, fresh start",
1172                        dir.display()
1173                    );
1174                    None
1175                }
1176                Err(e) => {
1177                    warn!(
1178                        "Failed to load close group cache from {}: {e}",
1179                        dir.display()
1180                    );
1181                    None
1182                }
1183            }
1184        } else {
1185            None
1186        };
1187
1188        // Connect to bootstrap peers
1189        self.connect_bootstrap_peers(close_group_cache.as_ref())
1190            .await?;
1191
1192        // Emit BootstrapComplete — the node is connected to the network and
1193        // the DHT routing table is populated; consumers waiting on this
1194        // event can start issuing queries. The relay-acquisition driver
1195        // runs asynchronously after this point, so the node's published
1196        // self-record may be direct-only for a brief window until the
1197        // driver's first acquisition attempt finishes.
1198        {
1199            let dht = self.adaptive_dht.dht_manager();
1200            let rt_size = dht.get_routing_table_size().await;
1201            dht.emit_event(DhtNetworkEvent::BootstrapComplete { num_peers: rt_size });
1202        }
1203
1204        // Spawn the relay-acquisition driver for Node mode.
1205        //
1206        // The driver unconditionally tries to acquire a MASQUE relay from
1207        // an XOR-closest peer right after bootstrap — there is no public/
1208        // private classification. Private candidates are filtered out
1209        // ambiently: their Direct addresses are unreachable from outside
1210        // their NAT, so the QUIC dial fails and the walker advances to
1211        // the next-closest peer.
1212        //
1213        // The driver also owns the relay-lost → republish → reacquire
1214        // state machine (see `reachability::driver` for the full flow).
1215        // Clients (`NodeMode::Client`) do not run the driver at all: they
1216        // are outbound-only and do not need to be reachable.
1217        if self.config.mode != NodeMode::Client {
1218            spawn_acquisition_driver(
1219                self.adaptive_dht.dht_manager().clone(),
1220                Arc::clone(&self.transport),
1221                Arc::clone(&self.relayer_peer_id),
1222                Arc::clone(&self.relay_address),
1223                self.shutdown.clone(),
1224            );
1225        } else {
1226            info!("client mode — skipping relay acquisition driver");
1227        }
1228
1229        // Spawn background task to forward peer address updates to the DHT.
1230        //
1231        // Two event streams are bridged from the transport layer onto DHT
1232        // routing-table mutations:
1233        //
1234        //  - **Relay established**: when THIS node sets up a MASQUE relay,
1235        //    perform a DHT self-lookup so the transport's re-advertisement
1236        //    loop can ADD_ADDRESS the new relay address to the K closest
1237        //    peers — propagating it beyond peers we already happen to be
1238        //    connected to.
1239        //  - **Peer address update**: when a connected peer advertises a new
1240        //    reachable address via ADD_ADDRESS (typically its relay), update
1241        //    the DHT routing table so future lookups return that address.
1242        //
1243        // Both are handled in a `tokio::select!` against the receiver
1244        // futures so updates propagate immediately. The previous
1245        // implementation polled both queues on a 1-second interval, which
1246        // opened a race window in which a freshly-established relay was
1247        // invisible to outbound DHT queries until the next tick — causing
1248        // the first peers to dial direct (and fail) before learning about
1249        // the relay.
1250        //
1251        // **Slow work isolation**: the relay-propagation path runs an
1252        // iterative DHT lookup (`find_closest_nodes_network`) which can
1253        // take many seconds. Doing it inline in the select loop would
1254        // starve the peer-address-update branch and back up the bounded
1255        // forwarder mpsc into drop territory. Instead, the lookup +
1256        // publish is detached into its own task per relay event, so the
1257        // select loop keeps polling both branches.
1258        // DHT_BRIDGE: forward peer-advertised address updates from the
1259        // transport layer onto DHT routing table mutations. When a connected
1260        // peer's ADD_ADDRESS notification carries a different IP than the
1261        // connection's source (i.e., the peer is behind a relay or has
1262        // migrated), merge the advertised address into the peer's DHT entry.
1263        //
1264        // This node's OWN relay state changes are NOT handled here — the
1265        // relay acquisition driver (see `reachability::driver`) owns them
1266        // directly, so the "relay established" branch no longer belongs to
1267        // the bridge. The driver knows the full typed address set for the
1268        // self-record; the bridge did not.
1269        {
1270            let transport = Arc::clone(&self.transport);
1271            let dht = self.adaptive_dht.dht_manager().clone();
1272            let shutdown = self.shutdown.clone();
1273            tokio::spawn(async move {
1274                loop {
1275                    tokio::select! {
1276                        biased;
1277                        _ = shutdown.cancelled() => break,
1278                        update = transport.recv_peer_address_update() => {
1279                            let Some((peer_addr, advertised_addr)) = update else { break };
1280                            let normalized_peer =
1281                                saorsa_transport::shared::normalize_socket_addr(peer_addr);
1282                            let normalized_adv =
1283                                saorsa_transport::shared::normalize_socket_addr(advertised_addr);
1284                            // Only update DHT when the advertised IP differs
1285                            // from the peer's connection IP. Same-IP updates
1286                            // are just different NATted ports (useless for
1287                            // symmetric NAT); different-IP means a relay.
1288                            if normalized_peer.ip() == normalized_adv.ip() {
1289                                debug!(
1290                                    "DHT_BRIDGE: dropping same-IP update peer={} addr={}",
1291                                    normalized_peer,
1292                                    normalized_adv
1293                                );
1294                                continue;
1295                            }
1296                            info!(
1297                                "DHT_BRIDGE: processing relay update peer={} addr={}",
1298                                normalized_peer,
1299                                normalized_adv
1300                            );
1301                            // Look up peer ID by address (tries both IPv4 and
1302                            // IPv4-mapped IPv6 forms via dual_stack_alternate).
1303                            // For symmetric NAT, this may fail because the
1304                            // connection's channel key uses a different NATted port.
1305                            if let Some(peer_id) = transport.peer_id_for_addr(&normalized_peer).await {
1306                                let multi_addr = MultiAddr::quic(normalized_adv);
1307                                info!(
1308                                    "Updating DHT: peer {} relay address {} (connection was {})",
1309                                    peer_id, advertised_addr, peer_addr
1310                                );
1311                                if !dht
1312                                    .touch_legacy_relay_hint_if_unsequenced(&peer_id, &multi_addr)
1313                                    .await
1314                                {
1315                                    debug!(
1316                                        "DHT_BRIDGE: ignored legacy relay hint for sequenced peer {} addr {}",
1317                                        peer_id, advertised_addr
1318                                    );
1319                                }
1320                            }
1321                        }
1322                    }
1323                }
1324            });
1325        }
1326
1327        self.is_started
1328            .store(true, std::sync::atomic::Ordering::Release);
1329
1330        Ok(())
1331    }
1332
1333    // start_network_listeners and start_message_receiving_system
1334    // are now implemented in TransportHandle
1335
1336    /// Run the P2P node (blocks until shutdown)
1337    pub async fn run(&self) -> Result<()> {
1338        if !self.is_running() {
1339            self.start().await?;
1340        }
1341
1342        info!("P2P node running...");
1343
1344        // Block until shutdown is signalled. All background work (connection
1345        // lifecycle, DHT maintenance, EigenTrust) runs in dedicated tasks.
1346        self.shutdown.cancelled().await;
1347
1348        info!("P2P node stopped");
1349        Ok(())
1350    }
1351
1352    /// Stop the P2P node
1353    pub async fn stop(&self) -> Result<()> {
1354        info!("Stopping P2P node...");
1355
1356        // Save close group cache before tearing down the DHT and transport layers.
1357        if let Some(ref dir) = self.config.close_group_cache_dir
1358            && let Err(e) = self.save_close_group_cache(dir).await
1359        {
1360            warn!("Failed to save close group cache on shutdown: {e}");
1361        }
1362
1363        // Signal the run loop to exit
1364        self.shutdown.cancel();
1365
1366        // Stop DHT layer first so leave messages can be sent while transport is still active.
1367        self.adaptive_dht.stop().await?;
1368
1369        // Stop the transport layer (shutdown endpoints, join tasks, disconnect peers)
1370        self.transport.stop().await?;
1371
1372        self.is_started
1373            .store(false, std::sync::atomic::Ordering::Release);
1374
1375        info!("P2P node stopped");
1376        Ok(())
1377    }
1378
1379    /// Graceful shutdown alias for tests
1380    pub async fn shutdown(&self) -> Result<()> {
1381        self.stop().await
1382    }
1383
1384    /// Check if the node is running
1385    pub fn is_running(&self) -> bool {
1386        self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
1387    }
1388
1389    /// Get the current listen addresses
1390    pub async fn listen_addrs(&self) -> Vec<MultiAddr> {
1391        self.transport.listen_addrs().await
1392    }
1393
1394    /// Get connected peers
1395    pub async fn connected_peers(&self) -> Vec<PeerId> {
1396        self.transport.connected_peers().await
1397    }
1398
1399    /// Get peer count
1400    pub async fn peer_count(&self) -> usize {
1401        self.transport.peer_count().await
1402    }
1403
1404    /// Get peer info
1405    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1406        self.transport.peer_info(peer_id).await
1407    }
1408
1409    /// Get the channel ID for a given address, if connected (internal only).
1410    #[allow(dead_code)]
1411    pub(crate) async fn get_channel_id_by_address(&self, addr: &MultiAddr) -> Option<String> {
1412        self.transport.get_channel_id_by_address(addr).await
1413    }
1414
1415    /// List all active transport-level connections (internal only).
1416    #[allow(dead_code)]
1417    pub(crate) async fn list_active_connections(&self) -> Vec<(String, Vec<MultiAddr>)> {
1418        self.transport.list_active_connections().await
1419    }
1420
1421    /// Remove a channel from the peers map (internal only).
1422    #[allow(dead_code)]
1423    pub(crate) async fn remove_channel(&self, channel_id: &str) -> bool {
1424        self.transport.remove_channel(channel_id).await
1425    }
1426
1427    /// Close a channel's QUIC connection and remove it from all tracking maps.
1428    ///
1429    /// Use when a transport-level connection was established but identity
1430    /// exchange failed, so no [`PeerId`] is available for [`disconnect_peer`].
1431    pub(crate) async fn disconnect_channel(&self, channel_id: &str) {
1432        self.transport.disconnect_channel(channel_id).await;
1433    }
1434
1435    /// Check if an authenticated peer is connected (has at least one active channel).
1436    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1437        self.transport.is_peer_connected(peer_id).await
1438    }
1439
1440    /// Connect to a peer, returning the transport-level channel ID.
1441    ///
1442    /// The returned channel ID is **not** the app-level [`PeerId`]. To obtain
1443    /// the authenticated peer identity, call
1444    /// [`wait_for_peer_identity`](Self::wait_for_peer_identity) with the
1445    /// returned channel ID.
1446    ///
1447    /// Callers that already know how the address was classified should
1448    /// prefer [`Self::connect_peer_typed`] so the resulting log line
1449    /// carries an accurate `kind` field instead of `unknown`.
1450    pub async fn connect_peer(&self, address: &MultiAddr) -> Result<String> {
1451        self.transport.connect_peer(address).await
1452    }
1453
1454    /// Connect to a peer at the given typed address.
1455    ///
1456    /// Same as [`Self::connect_peer`] but threads the [`AddressType`]
1457    /// through to the transport-level dial log so an operator can tell,
1458    /// after the fact, whether a failed dial was against a `Direct`,
1459    /// `Relay`, `Unverified`, or `Lan` address.
1460    pub async fn connect_peer_typed(
1461        &self,
1462        address: &MultiAddr,
1463        kind: AddressType,
1464    ) -> Result<String> {
1465        self.transport.connect_peer_typed(address, kind).await
1466    }
1467
1468    /// Wait for the identity exchange on `channel_id` to complete, returning
1469    /// the authenticated [`PeerId`].
1470    ///
1471    /// Use this after [`connect_peer`](Self::connect_peer) to bridge the gap
1472    /// between the transport-level channel ID and the app-level peer identity
1473    /// required by [`send_message`](Self::send_message).
1474    pub async fn wait_for_peer_identity(
1475        &self,
1476        channel_id: &str,
1477        timeout: Duration,
1478    ) -> Result<PeerId> {
1479        self.transport
1480            .wait_for_peer_identity(channel_id, timeout)
1481            .await
1482    }
1483
1484    /// Disconnect from a peer
1485    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1486        self.transport.disconnect_peer(peer_id).await
1487    }
1488
1489    /// Check if a connection to a peer is active (internal only).
1490    #[allow(dead_code)]
1491    pub(crate) async fn is_connection_active(&self, channel_id: &str) -> bool {
1492        self.transport.is_connection_active(channel_id).await
1493    }
1494
1495    /// Send a message to an authenticated peer, reconnecting on demand.
1496    ///
1497    /// Tries the existing connection first. If the send fails (stale QUIC
1498    /// session, peer not found, etc.), resolves a dial address from:
1499    ///
1500    /// 1. Caller-provided `addrs` (highest priority)
1501    /// 2. Addresses cached in the transport layer (snapshotted before the
1502    ///    send attempt, since stale-channel cleanup removes them)
1503    /// 3. DHT routing table
1504    ///
1505    /// Then dials, waits for identity exchange, and retries the send exactly
1506    /// once on the fresh connection.  Concurrent reconnects to the same peer
1507    /// are serialised so only one dial is attempted at a time.
1508    pub async fn send_message(
1509        &self,
1510        peer_id: &PeerId,
1511        protocol: &str,
1512        data: Vec<u8>,
1513        addrs: &[MultiAddr],
1514    ) -> Result<()> {
1515        // Snapshot channel IDs before the send attempt — transport.send_message
1516        // prunes dead channels from bookkeeping but does NOT close the
1517        // underlying QUIC connection.  We need the original IDs for
1518        // disconnect_channel later.
1519        let existing_channels = self.transport.channels_for_peer(peer_id).await;
1520
1521        // No existing connection — serialise so concurrent sends to the same
1522        // unconnected peer don't each open their own QUIC connection.
1523        if existing_channels.is_empty() {
1524            let lock = self.reconnect_lock_for(peer_id);
1525            let _guard = lock.lock().await;
1526
1527            // Another sender may have connected while we waited for the lock.
1528            if self.transport.is_peer_connected(peer_id).await {
1529                return self.transport.send_message(peer_id, protocol, data).await;
1530            }
1531
1532            return self
1533                .reconnect_and_send(peer_id, protocol, data, addrs, &[], &[])
1534                .await;
1535        }
1536
1537        // Snapshot addresses before the send attempt — transport.send_message
1538        // prunes stale channels, which removes peer_info.
1539        let saved_addrs: Vec<MultiAddr> = self
1540            .transport
1541            .peer_info(peer_id)
1542            .await
1543            .map(|info| info.addresses)
1544            .unwrap_or_default();
1545
1546        // Clone data for retry — only stale-channel failures are retried, but
1547        // transport.send_message consumes the Vec.
1548        let retry_data = data.clone();
1549
1550        // Fast path: try existing connection.
1551        let send_result = self.transport.send_message(peer_id, protocol, data).await;
1552        match send_result {
1553            Ok(()) => return Ok(()),
1554            Err(e) => {
1555                if !e.is_stale_channel_send_failure() {
1556                    debug!(
1557                        peer = %peer_id.to_hex(),
1558                        error = %e,
1559                        "send failed during active channel use, not reconnecting",
1560                    );
1561                    return Err(e);
1562                }
1563
1564                debug!(
1565                    peer = %peer_id.to_hex(),
1566                    error = %e,
1567                    "stale channel send failed, attempting reconnect",
1568                );
1569            }
1570        }
1571
1572        // Serialise reconnect attempts so concurrent sends to the same
1573        // stale peer don't race to dial.
1574        let lock = self.reconnect_lock_for(peer_id);
1575        let _guard = lock.lock().await;
1576
1577        // Another sender may have reconnected while we waited for the lock.
1578        if self.transport.is_peer_connected(peer_id).await {
1579            // Close stale QUIC connections that remove_channel (called inside
1580            // transport.send_message on failure) didn't tear down — it only
1581            // removes bookkeeping, not the underlying QUIC session.
1582            for channel_id in &existing_channels {
1583                self.transport.disconnect_channel(channel_id).await;
1584            }
1585            return self
1586                .transport
1587                .send_message(peer_id, protocol, retry_data)
1588                .await;
1589        }
1590
1591        self.reconnect_and_send(
1592            peer_id,
1593            protocol,
1594            retry_data,
1595            addrs,
1596            &saved_addrs,
1597            &existing_channels,
1598        )
1599        .await
1600    }
1601
1602    /// Tear down stale channels, reconnect to a peer, and send a message.
1603    async fn reconnect_and_send(
1604        &self,
1605        peer_id: &PeerId,
1606        protocol: &str,
1607        data: Vec<u8>,
1608        addrs: &[MultiAddr],
1609        saved_addrs: &[MultiAddr],
1610        stale_channels: &[String],
1611    ) -> Result<()> {
1612        // Resolve a dial address: caller-provided > saved > DHT.
1613        let (address, kind) = self
1614            .resolve_dial_address(peer_id, addrs, saved_addrs)
1615            .await
1616            .ok_or_else(|| {
1617                P2PError::Network(NetworkError::PeerNotFound(peer_id.to_hex().into()))
1618            })?;
1619
1620        // Tear down stale QUIC connections using their actual channel IDs.
1621        // transport.send_message only removes bookkeeping (peer_to_channel,
1622        // peers, active_connections) — it does NOT close the underlying QUIC
1623        // connection.  We must use the real channel IDs, not the resolved
1624        // dial address, because NAT / port migration can make them differ.
1625        if !stale_channels.is_empty() {
1626            for channel_id in stale_channels {
1627                self.transport.disconnect_channel(channel_id).await;
1628            }
1629            tokio::time::sleep(QUIC_TEARDOWN_GRACE).await;
1630        }
1631
1632        // Dial and wait for identity exchange.
1633        let channel_id = self.transport.connect_peer_typed(&address, kind).await?;
1634        let authenticated = match self
1635            .transport
1636            .wait_for_peer_identity(&channel_id, IDENTITY_EXCHANGE_TIMEOUT)
1637            .await
1638        {
1639            Ok(peer) => peer,
1640            Err(e) => {
1641                // Close the freshly-dialed QUIC connection so it doesn't
1642                // linger as a zombie until idle timeout.
1643                self.transport.disconnect_channel(&channel_id).await;
1644                return Err(e);
1645            }
1646        };
1647
1648        if &authenticated != peer_id {
1649            self.transport.disconnect_channel(&channel_id).await;
1650            return Err(P2PError::Identity(IdentityError::IdentityMismatch {
1651                expected: peer_id.to_hex().into(),
1652                actual: authenticated.to_hex().into(),
1653            }));
1654        }
1655
1656        // Send on the fresh connection.
1657        self.transport.send_message(peer_id, protocol, data).await
1658    }
1659
1660    /// Resolve a dial address for `peer_id`, preferring caller-provided
1661    /// addresses over cached/DHT sources.
1662    ///
1663    /// Returns the first dialable (QUIC, non-unspecified) address found,
1664    /// paired with the [`AddressType`] the DHT routing table believes
1665    /// for that address. Caller-provided / saved addresses that don't
1666    /// appear in the routing table fall back to
1667    /// [`AddressType::Unverified`] — the same default the routing table
1668    /// applies to legacy peers that never asserted reachability.
1669    /// Returns `None` when no dialable address is available.
1670    async fn resolve_dial_address(
1671        &self,
1672        peer_id: &PeerId,
1673        caller_addrs: &[MultiAddr],
1674        saved_addrs: &[MultiAddr],
1675    ) -> Option<(MultiAddr, AddressType)> {
1676        // Caller- and saved-supplied addresses skip the routing-table read.
1677        // The kind is only consumed as a log tag by `connect_peer_typed`, so
1678        // defaulting to Unverified — the same fallback the routing table
1679        // applies to legacy peers — saves an async lookup on the hot
1680        // reconnect path. Only consult the DHT when both upstream sources
1681        // are exhausted.
1682        if let Some(addr) = Self::first_dialable(caller_addrs) {
1683            return Some((addr, AddressType::Unverified));
1684        }
1685        if let Some(addr) = Self::first_dialable(saved_addrs) {
1686            return Some((addr, AddressType::Unverified));
1687        }
1688
1689        self.adaptive_dht
1690            .peer_addresses_for_dial_typed(peer_id)
1691            .await
1692            .into_iter()
1693            .find(|(a, _)| {
1694                a.dialable_socket_addr()
1695                    .is_some_and(|sa| !sa.ip().is_unspecified())
1696            })
1697    }
1698
1699    /// Return the first dialable QUIC address from a slice, skipping
1700    /// non-QUIC and unspecified (`0.0.0.0` / `::`) addresses.
1701    fn first_dialable(addrs: &[MultiAddr]) -> Option<MultiAddr> {
1702        addrs
1703            .iter()
1704            .find(|a| {
1705                let dialable = a
1706                    .dialable_socket_addr()
1707                    .is_some_and(|sa| !sa.ip().is_unspecified());
1708                if !dialable {
1709                    trace!(address = %a, "skipping non-dialable address");
1710                }
1711                dialable
1712            })
1713            .cloned()
1714    }
1715
1716    /// Get or create a per-peer reconnect lock.
1717    fn reconnect_lock_for(&self, peer_id: &PeerId) -> Arc<TokioMutex<()>> {
1718        self.reconnect_locks
1719            .lock()
1720            .entry(*peer_id)
1721            .or_insert_with(|| Arc::new(TokioMutex::new(())))
1722            .clone()
1723    }
1724}
1725
1726/// Parse a postcard-encoded protocol message into a `P2PEvent::Message`.
1727///
1728/// Returns `None` if the bytes cannot be deserialized as a valid `WireMessage`.
1729///
1730/// The `from` field is a required part of the wire protocol but is **not**
1731/// used as the event source. Instead, `source` — the transport-level peer ID
1732/// derived from the authenticated QUIC connection — is used so that consumers
1733/// can pass it directly to `send_message()`. This eliminates a spoofing
1734/// vector where a peer could claim an arbitrary identity via the payload.
1735///
1736/// Maximum allowed clock skew for message timestamps.
1737///
1738/// A decentralized network cannot assume participants have accurate clocks.
1739/// Consumer devices commonly have clocks that drift by minutes (no NTP, wrong
1740/// timezone offset applied to UTC, suspended laptops, VMs without guest
1741/// additions, etc.). Both past and future windows must be symmetric and
1742/// generous enough that normal clock drift does not partition the network.
1743///
1744/// 5 minutes in both directions provides replay protection while tolerating
1745/// the clock skew observed in real-world deployments (31-42 seconds was
1746/// measured between a macOS client and NTP-synced VPS nodes).
1747const MAX_MESSAGE_AGE_SECS: u64 = 300;
1748/// Maximum allowed future timestamp — symmetric with the past window.
1749const MAX_FUTURE_SECS: u64 = 300;
1750
1751/// Convenience constructor for `P2PError::Network(NetworkError::ProtocolError(...))`.
1752fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
1753    P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
1754}
1755
1756/// Helper to send an event via a broadcast sender, logging at trace level if no receivers.
1757pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
1758    if let Err(e) = tx.send(event) {
1759        tracing::trace!("Event broadcast has no receivers: {e}");
1760    }
1761}
1762
1763/// Result of parsing a protocol message, including optional authenticated identity.
1764pub(crate) struct ParsedMessage {
1765    /// The P2P event to broadcast.
1766    pub(crate) event: P2PEvent,
1767    /// If the message was signed and verified, the authenticated app-level [`PeerId`].
1768    pub(crate) authenticated_node_id: Option<PeerId>,
1769    /// The sender's user agent string from the wire message.
1770    pub(crate) user_agent: String,
1771}
1772
1773pub(crate) fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<ParsedMessage> {
1774    let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1775    let transport_source = source.parse::<SocketAddr>().ok().map(MultiAddr::quic);
1776
1777    // Validate timestamp to prevent replay attacks
1778    let now = std::time::SystemTime::now()
1779        .duration_since(std::time::UNIX_EPOCH)
1780        .map(|d| d.as_secs())
1781        .unwrap_or(0);
1782
1783    // Reject messages that are too old (potential replay)
1784    if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
1785        tracing::warn!(
1786            "Rejecting stale message from {} (timestamp {} is {} seconds old)",
1787            source,
1788            message.timestamp,
1789            now.saturating_sub(message.timestamp)
1790        );
1791        return None;
1792    }
1793
1794    // Reject messages too far in the future (clock manipulation)
1795    if message.timestamp > now + MAX_FUTURE_SECS {
1796        tracing::warn!(
1797            "Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
1798            source,
1799            message.timestamp,
1800            message.timestamp.saturating_sub(now)
1801        );
1802        return None;
1803    }
1804
1805    // Verify app-level signature if present
1806    let authenticated_node_id = if !message.signature.is_empty() {
1807        match verify_message_signature(&message) {
1808            Ok(peer_id) => {
1809                debug!(
1810                    "Message from {} authenticated as app-level NodeId {}",
1811                    source, peer_id
1812                );
1813                Some(peer_id)
1814            }
1815            Err(e) => {
1816                warn!(
1817                    "Rejecting message from {}: signature verification failed: {}",
1818                    source, e
1819                );
1820                return None;
1821            }
1822        }
1823    } else {
1824        None
1825    };
1826
1827    debug!(
1828        "Parsed P2PEvent::Message - topic: {}, source: {:?} (transport: {}, logical: {}), payload_len: {}",
1829        message.protocol,
1830        authenticated_node_id,
1831        source,
1832        message.from,
1833        message.data.len()
1834    );
1835
1836    Some(ParsedMessage {
1837        event: P2PEvent::Message {
1838            topic: message.protocol,
1839            source: authenticated_node_id,
1840            transport_source,
1841            data: message.data,
1842        },
1843        authenticated_node_id,
1844        user_agent: message.user_agent,
1845    })
1846}
1847
1848/// Verify the ML-DSA-65 signature on a WireMessage and return the authenticated [`PeerId`].
1849///
1850/// Besides verifying the cryptographic signature, this also checks that the
1851/// self-asserted `from` field matches the [`PeerId`] derived from the public
1852/// key. This prevents a sender from signing with their real key while
1853/// claiming a different identity in the `from` field.
1854fn verify_message_signature(message: &WireMessage) -> std::result::Result<PeerId, String> {
1855    let pubkey = MlDsaPublicKey::from_bytes(&message.public_key)
1856        .map_err(|e| format!("invalid public key: {e:?}"))?;
1857
1858    let peer_id = peer_id_from_public_key(&pubkey);
1859
1860    // Validate that the self-asserted `from` field matches the public key.
1861    if message.from != peer_id {
1862        return Err(format!(
1863            "from field mismatch: message claims '{}' but public key derives '{}'",
1864            message.from, peer_id
1865        ));
1866    }
1867
1868    let signable = postcard::to_stdvec(&(
1869        &message.protocol,
1870        &message.data as &[u8],
1871        &message.from,
1872        message.timestamp,
1873        &message.user_agent,
1874    ))
1875    .map_err(|e| format!("failed to serialize signable bytes: {e}"))?;
1876
1877    let sig = MlDsaSignature::from_bytes(&message.signature)
1878        .map_err(|e| format!("invalid signature: {e:?}"))?;
1879
1880    let valid = crate::quantum_crypto::ml_dsa_verify(&pubkey, &signable, &sig)
1881        .map_err(|e| format!("verification error: {e}"))?;
1882
1883    if valid {
1884        Ok(peer_id)
1885    } else {
1886        Err("signature is invalid".to_string())
1887    }
1888}
1889
1890impl P2PNode {
1891    /// Subscribe to network events
1892    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1893        self.transport.subscribe_events()
1894    }
1895
1896    /// Backwards-compat event stream accessor for tests
1897    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1898        self.subscribe_events()
1899    }
1900
1901    /// Get node uptime
1902    pub fn uptime(&self) -> Duration {
1903        self.start_time.elapsed()
1904    }
1905
1906    // MCP removed: all MCP tool/service methods removed
1907
1908    // /// Handle MCP remote tool call with network integration
1909
1910    // /// List tools available on a specific remote peer
1911
1912    // /// Get MCP server statistics
1913
1914    // Background tasks (connection_lifecycle_monitor, keepalive, periodic_maintenance)
1915    // are now implemented in TransportHandle.
1916
1917    /// Check system health
1918    pub async fn health_check(&self) -> Result<()> {
1919        let peer_count = self.peer_count().await;
1920        if peer_count > self.config.max_connections {
1921            Err(protocol_error(format!(
1922                "Too many connections: {peer_count}"
1923            )))
1924        } else {
1925            Ok(())
1926        }
1927    }
1928
1929    /// Get the attached DHT manager.
1930    pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
1931        self.adaptive_dht.dht_manager()
1932    }
1933
1934    /// Backwards-compatible alias for `dht_manager()`.
1935    pub fn dht(&self) -> &Arc<DhtNetworkManager> {
1936        self.dht_manager()
1937    }
1938
1939    /// Add a discovered peer to the bootstrap cache
1940    pub async fn add_discovered_peer(
1941        &self,
1942        _peer_id: PeerId,
1943        addresses: Vec<MultiAddr>,
1944    ) -> Result<()> {
1945        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1946            let manager = bootstrap_manager.read().await;
1947            let socket_addresses: Vec<std::net::SocketAddr> = addresses
1948                .iter()
1949                .filter_map(|addr| addr.socket_addr())
1950                .collect();
1951            if let Some(&primary) = socket_addresses.first() {
1952                manager
1953                    .add_peer(&primary, socket_addresses)
1954                    .await
1955                    .map_err(|e| {
1956                        protocol_error(format!("Failed to add peer to bootstrap cache: {e}"))
1957                    })?;
1958            }
1959        }
1960        Ok(())
1961    }
1962
1963    /// Update connection metrics for a peer in the bootstrap cache
1964    pub async fn update_peer_metrics(
1965        &self,
1966        addr: &MultiAddr,
1967        success: bool,
1968        latency_ms: Option<u64>,
1969        _error: Option<String>,
1970    ) -> Result<()> {
1971        if let Some(ref bootstrap_manager) = self.bootstrap_manager
1972            && let Some(sa) = addr.socket_addr()
1973        {
1974            let manager = bootstrap_manager.read().await;
1975            if success {
1976                let rtt_ms = latency_ms.unwrap_or(0) as u32;
1977                manager.record_success(&sa, rtt_ms).await;
1978            } else {
1979                manager.record_failure(&sa).await;
1980            }
1981        }
1982        Ok(())
1983    }
1984
1985    /// Get bootstrap cache statistics
1986    pub async fn get_bootstrap_cache_stats(
1987        &self,
1988    ) -> Result<Option<crate::bootstrap::BootstrapStats>> {
1989        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1990            let manager = bootstrap_manager.read().await;
1991            Ok(Some(manager.stats().await))
1992        } else {
1993            Ok(None)
1994        }
1995    }
1996
1997    /// Get the number of cached bootstrap peers
1998    pub async fn cached_peer_count(&self) -> usize {
1999        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2000            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2001        {
2002            return stats.total_peers;
2003        }
2004        0
2005    }
2006
2007    /// Connect to bootstrap peers and perform initial peer discovery.
2008    ///
2009    /// If a `close_group_cache` was loaded on startup, its peers are injected
2010    /// as the highest-priority addresses (before configured and cached bootstrap
2011    /// peers). Their trust scores were already imported into the `TrustEngine`
2012    /// before this method is called.
2013    async fn connect_bootstrap_peers(
2014        &self,
2015        close_group_cache: Option<&CloseGroupCache>,
2016    ) -> Result<()> {
2017        // Each entry is a list of addresses for a single peer. Close-group
2018        // peers are dialed serially to preserve trust-priority ordering; CLI
2019        // and cached bootstrap peers are dialed concurrently to cut cold-start
2020        // latency when some peers are slow or dead.
2021        let mut serial_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
2022        let mut parallel_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
2023        let mut used_cache = false;
2024        let mut seen_addresses = std::collections::HashSet::new();
2025
2026        // Priority 0: Cached close group peers (pre-trusted, highest priority).
2027        // These peers had trust scores loaded into the TrustEngine earlier in start(),
2028        // so they are already known-good when added to the routing table.
2029        // Sorted by trust score (highest first), then XOR distance (closest first)
2030        // as tiebreaker so we reconnect to the most trusted, closest peers first.
2031        if let Some(cache) = close_group_cache {
2032            let mut sorted_peers: Vec<&CachedCloseGroupPeer> = cache.peers.iter().collect();
2033            sorted_peers.sort_by(|a, b| {
2034                // NaN-safe comparison: push NaN scores to the back instead
2035                // of treating them as equal (which would silently promote
2036                // corrupted entries to the front of the reconnection queue).
2037                let score_ord = match b.trust.score.partial_cmp(&a.trust.score) {
2038                    Some(ord) => ord,
2039                    None => {
2040                        if a.trust.score.is_nan() {
2041                            std::cmp::Ordering::Greater // a is NaN, push to back
2042                        } else {
2043                            std::cmp::Ordering::Less // b is NaN, push b to back
2044                        }
2045                    }
2046                };
2047                score_ord.then_with(|| {
2048                    let da = self.peer_id.xor_distance(&a.peer_id);
2049                    let db = self.peer_id.xor_distance(&b.peer_id);
2050                    da.cmp(&db)
2051                })
2052            });
2053
2054            let mut added_from_close_group = 0usize;
2055            for peer in &sorted_peers {
2056                let new_addresses: Vec<MultiAddr> = peer
2057                    .addresses
2058                    .iter()
2059                    .filter(|a| {
2060                        a.dialable_socket_addr()
2061                            .is_some_and(|sa| !seen_addresses.contains(&sa))
2062                    })
2063                    .cloned()
2064                    .collect();
2065
2066                if !new_addresses.is_empty() {
2067                    for addr in &new_addresses {
2068                        if let Some(sa) = addr.socket_addr() {
2069                            seen_addresses.insert(sa);
2070                        }
2071                    }
2072                    serial_addr_sets.push(new_addresses);
2073                    added_from_close_group += 1;
2074                }
2075            }
2076            if added_from_close_group > 0 {
2077                info!(
2078                    "Added {} close group cache peers (highest trust first)",
2079                    added_from_close_group
2080                );
2081            }
2082        }
2083
2084        // Priority 1: Configured bootstrap peers.
2085        if !self.config.bootstrap_peers.is_empty() {
2086            info!(
2087                "Using {} configured bootstrap peers (priority)",
2088                self.config.bootstrap_peers.len()
2089            );
2090            for multiaddr in &self.config.bootstrap_peers {
2091                let Some(socket_addr) = multiaddr.dialable_socket_addr() else {
2092                    warn!("Skipping non-QUIC bootstrap peer: {}", multiaddr);
2093                    continue;
2094                };
2095                seen_addresses.insert(socket_addr);
2096                parallel_addr_sets.push(vec![multiaddr.clone()]);
2097            }
2098        }
2099
2100        // Supplement with cached bootstrap peers (after CLI peers)
2101        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2102            let manager = bootstrap_manager.read().await;
2103            let cached_peers = manager.select_peers(BOOTSTRAP_PEER_BATCH_SIZE).await;
2104            if !cached_peers.is_empty() {
2105                let mut added_from_cache = 0;
2106                for cached in cached_peers {
2107                    let mut addrs = vec![cached.primary_address];
2108                    addrs.extend(cached.addresses);
2109                    // Only add addresses we haven't seen from CLI peers
2110                    let new_addresses: Vec<MultiAddr> = addrs
2111                        .into_iter()
2112                        .filter(|a| !seen_addresses.contains(a))
2113                        .map(MultiAddr::quic)
2114                        .collect();
2115
2116                    if !new_addresses.is_empty() {
2117                        for addr in &new_addresses {
2118                            if let Some(sa) = addr.socket_addr() {
2119                                seen_addresses.insert(sa);
2120                            }
2121                        }
2122                        parallel_addr_sets.push(new_addresses);
2123                        added_from_cache += 1;
2124                    }
2125                }
2126                if added_from_cache > 0 {
2127                    info!(
2128                        "Added {} cached bootstrap peers (supplementing CLI peers)",
2129                        added_from_cache
2130                    );
2131                    used_cache = true;
2132                }
2133            }
2134        }
2135
2136        if serial_addr_sets.is_empty() && parallel_addr_sets.is_empty() {
2137            info!("No bootstrap peers configured and no cached peers available");
2138            return Ok(());
2139        }
2140
2141        // Connect to bootstrap peers, wait for identity exchange, then
2142        // perform DHT peer discovery using the real cryptographic PeerIds.
2143        let identity_timeout = Duration::from_secs(BOOTSTRAP_IDENTITY_TIMEOUT_SECS);
2144        let mut successful_connections = 0;
2145        let mut connected_peer_ids: Vec<PeerId> = Vec::new();
2146
2147        // Phase A: serial close-group dials to preserve trust-priority ordering.
2148        let client_mode = matches!(self.config.mode, NodeMode::Client);
2149        for addrs in &serial_addr_sets {
2150            if let Some(peer_id) = self
2151                .dial_bootstrap_addr_set(addrs, used_cache, identity_timeout)
2152                .await
2153            {
2154                successful_connections += 1;
2155                connected_peer_ids.push(peer_id);
2156                if client_mode && successful_connections >= CLIENT_BOOTSTRAP_TARGET {
2157                    debug!(
2158                        "Client bootstrap target reached ({successful_connections} peers) — skipping remaining serial dials"
2159                    );
2160                    break;
2161                }
2162            }
2163        }
2164
2165        // Phase B: concurrent dials of CLI + cached bootstrap peers, bounded
2166        // by `MAX_CONCURRENT_BOOTSTRAP_DIALS` to cap simultaneous QUIC+PQC
2167        // handshakes. Skipped entirely when a client has already hit its
2168        // target during Phase A.
2169        if !client_mode || successful_connections < CLIENT_BOOTSTRAP_TARGET {
2170            let mut parallel_stream =
2171                futures::stream::iter(parallel_addr_sets.into_iter().map(|addrs| async move {
2172                    self.dial_bootstrap_addr_set(&addrs, used_cache, identity_timeout)
2173                        .await
2174                }))
2175                .buffer_unordered(MAX_CONCURRENT_BOOTSTRAP_DIALS);
2176            while let Some(result) = parallel_stream.next().await {
2177                if let Some(peer_id) = result {
2178                    successful_connections += 1;
2179                    connected_peer_ids.push(peer_id);
2180                    if client_mode && successful_connections >= CLIENT_BOOTSTRAP_TARGET {
2181                        debug!(
2182                            "Client bootstrap target reached ({successful_connections} peers) — cancelling pending dials"
2183                        );
2184                        break;
2185                    }
2186                }
2187            }
2188            // `parallel_stream` is dropped here when the `if` block exits,
2189            // cancelling any in-flight futures inside `buffer_unordered`
2190            // before we proceed to the DHT discovery phase below.
2191        }
2192
2193        if successful_connections == 0 {
2194            // Outbound connections failed — but for nodes behind symmetric NAT,
2195            // the bootstrap peer may have already connected INBOUND to us.
2196            // Wait briefly and check if we have any transport-level connections.
2197            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2198            let transport_peers = self.transport.connected_peers().await;
2199            if !transport_peers.is_empty() {
2200                info!(
2201                    "No outbound bootstrap succeeded, but {} inbound peer(s) connected — proceeding with DHT bootstrap",
2202                    transport_peers.len()
2203                );
2204                connected_peer_ids = transport_peers;
2205                successful_connections = connected_peer_ids.len();
2206            } else {
2207                if !used_cache {
2208                    warn!("Failed to connect to any bootstrap peers");
2209                }
2210                // Starting a node should not be gated on immediate bootstrap connectivity.
2211                // Keep running and allow background discovery / retries to populate peers later.
2212                return Ok(());
2213            }
2214        }
2215
2216        info!(
2217            "Successfully connected to {} bootstrap peers",
2218            successful_connections
2219        );
2220
2221        // Perform DHT peer discovery from connected bootstrap peers.
2222        match self
2223            .dht_manager()
2224            .bootstrap_from_peers(&connected_peer_ids)
2225            .await
2226        {
2227            Ok(count) => info!("DHT peer discovery found {} peers", count),
2228            Err(e) => warn!("DHT peer discovery failed: {}", e),
2229        }
2230
2231        // Perform two consecutive self-lookups to fully refresh the close
2232        // neighborhood. The second lookup may discover peers that joined or
2233        // became reachable during the first lookup (Section 11.2 step 5).
2234        //
2235        // Client-mode nodes don't serve the DHT, so they don't need an
2236        // accurate close neighborhood — they just need enough peers to route
2237        // lookups for their own requests, which `bootstrap_from_peers` above
2238        // already provides. Skipping the self-lookups here cuts cold-start
2239        // latency by tens of seconds when α-sized batches include dead peers.
2240        if matches!(self.config.mode, NodeMode::Node) {
2241            const SELF_LOOKUP_ROUNDS: u8 = 2;
2242            for i in 1..=SELF_LOOKUP_ROUNDS {
2243                if let Err(e) = self.dht_manager().trigger_self_lookup().await {
2244                    warn!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} failed: {e}");
2245                } else {
2246                    debug!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} completed");
2247                }
2248            }
2249        } else {
2250            debug!("Skipping post-bootstrap self-lookups (client mode)");
2251        }
2252
2253        // Mark node as bootstrapped - we have connected to bootstrap peers
2254        // and initiated peer discovery
2255        self.is_bootstrapped.store(true, Ordering::SeqCst);
2256        info!(
2257            "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
2258            successful_connections,
2259            connected_peer_ids.len()
2260        );
2261
2262        // Save close group cache after initial bootstrap so a crash before
2263        // graceful shutdown still preserves the newly-discovered close group.
2264        if let Some(ref dir) = self.config.close_group_cache_dir
2265            && let Err(e) = self.save_close_group_cache(dir).await
2266        {
2267            warn!("Failed to save close group cache after bootstrap: {e}");
2268        }
2269
2270        Ok(())
2271    }
2272
2273    /// Dial a single bootstrap peer's address set, stopping at the first
2274    /// address that completes the identity handshake. Records success/failure
2275    /// in the bootstrap cache and returns the remote peer's cryptographic
2276    /// PeerId on success. Safe to call concurrently for different peers.
2277    async fn dial_bootstrap_addr_set(
2278        &self,
2279        addrs: &[MultiAddr],
2280        used_cache: bool,
2281        identity_timeout: Duration,
2282    ) -> Option<PeerId> {
2283        for addr in addrs {
2284            // Bootstrap addresses come from operator-supplied seeds (CLI
2285            // flags, config file, bootstrap cache). The local reachability
2286            // classifier hasn't proven them yet, so log them as
2287            // `Unverified` rather than `unknown`.
2288            match self
2289                .transport
2290                .connect_peer_typed(addr, AddressType::Unverified)
2291                .await
2292            {
2293                Ok(channel_id) => match self
2294                    .transport
2295                    .wait_for_peer_identity(&channel_id, identity_timeout)
2296                    .await
2297                {
2298                    Ok(real_peer_id) => {
2299                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2300                            let manager = bootstrap_manager.read().await;
2301                            if let Some(sa) = addr.socket_addr() {
2302                                manager.record_success(&sa, 100).await;
2303                            }
2304                        }
2305                        return Some(real_peer_id);
2306                    }
2307                    Err(e) => {
2308                        warn!(
2309                            "Timeout waiting for identity from bootstrap peer {}: {}, \
2310                             closing channel {}",
2311                            addr, e, channel_id
2312                        );
2313                        self.disconnect_channel(&channel_id).await;
2314                    }
2315                },
2316                Err(e) => {
2317                    warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2318                    if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2319                        let manager = bootstrap_manager.read().await;
2320                        if let Some(sa) = addr.socket_addr() {
2321                            manager.record_failure(&sa).await;
2322                        }
2323                    }
2324                }
2325            }
2326        }
2327        None
2328    }
2329
2330    /// Persist the current close group peers and their trust scores to disk.
2331    async fn save_close_group_cache(&self, dir: &Path) -> anyhow::Result<()> {
2332        let key: crate::dht::Key = *self.peer_id.as_bytes();
2333        let k_value = self.config.dht_config.k_value;
2334        let close_group = self
2335            .dht_manager()
2336            .find_closest_nodes_local(&key, k_value)
2337            .await;
2338
2339        if close_group.is_empty() {
2340            debug!("No close group peers to save");
2341            return Ok(());
2342        }
2343
2344        let trust_engine = self.adaptive_dht.trust_engine();
2345        let now_epoch = SystemTime::now()
2346            .duration_since(UNIX_EPOCH)
2347            .map(|d| d.as_secs())
2348            .unwrap_or(0);
2349
2350        let peers: Vec<CachedCloseGroupPeer> = close_group
2351            .into_iter()
2352            .filter_map(|dht_node| {
2353                let score = trust_engine.score(&dht_node.peer_id);
2354                // Guard against NaN/Infinity — serde_json cannot round-trip
2355                // non-finite f64 values, which would corrupt the cache file.
2356                if !score.is_finite() {
2357                    return None;
2358                }
2359                Some(CachedCloseGroupPeer {
2360                    peer_id: dht_node.peer_id,
2361                    addresses: dht_node.addresses,
2362                    trust: TrustRecord {
2363                        score,
2364                        last_updated_epoch_secs: now_epoch,
2365                    },
2366                })
2367            })
2368            .collect();
2369
2370        let peer_count = peers.len();
2371        let cache = CloseGroupCache {
2372            peers,
2373            saved_at_epoch_secs: now_epoch,
2374        };
2375
2376        cache.save_to_dir(dir).await?;
2377        info!(
2378            "Saved {} close group peers to cache in {}",
2379            peer_count,
2380            dir.display()
2381        );
2382        Ok(())
2383    }
2384
2385    // disconnect_all_peers and periodic_tasks are now in TransportHandle
2386}
2387
2388/// Network sender trait for sending messages
2389#[async_trait::async_trait]
2390#[allow(dead_code)]
2391pub trait NetworkSender: Send + Sync {
2392    /// Send a message to an authenticated peer.
2393    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2394
2395    /// Get our local peer ID (cryptographic identity).
2396    fn local_peer_id(&self) -> PeerId;
2397}
2398
2399// P2PNetworkSender removed — NetworkSender is now implemented directly on TransportHandle.
2400// NodeBuilder removed — use NodeConfigBuilder + P2PNode::new() instead.
2401
2402#[cfg(test)]
2403#[allow(clippy::unwrap_used, clippy::expect_used)]
2404mod diversity_tests {
2405    use super::*;
2406    use crate::security::IPDiversityConfig;
2407
2408    async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2409        // Use a temp dir to avoid conflicts with cached files from old format
2410        let temp_dir = tempfile::TempDir::new().expect("temp dir");
2411        let mut bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
2412        bootstrap_config.cache_dir = temp_dir.path().to_path_buf();
2413
2414        BootstrapManager::with_node_config(bootstrap_config, config)
2415            .await
2416            .expect("bootstrap manager")
2417    }
2418
2419    #[tokio::test]
2420    async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2421        let config = NodeConfig {
2422            diversity_config: Some(IPDiversityConfig::testnet()),
2423            ..Default::default()
2424        };
2425
2426        let manager = build_bootstrap_manager_like_prod(&config).await;
2427        // Verify testnet config has permissive IP limits
2428        assert_eq!(manager.diversity_config().max_per_ip, Some(usize::MAX));
2429        assert_eq!(manager.diversity_config().max_per_subnet, Some(usize::MAX));
2430    }
2431}
2432
2433/// Helper function to register a new channel.
2434///
2435/// Sync because the underlying map is a sharded `DashMap` — no `.await` is
2436/// needed to take a write lock. Keeping this sync is what lets the inbound
2437/// accept loop in `TransportHandle` insert without yielding, so it cannot
2438/// stall and back-pressure the upstream handshake channel.
2439pub(crate) fn register_new_channel(
2440    peers: &DashMap<String, PeerInfo>,
2441    channel_id: &str,
2442    remote_addr: &MultiAddr,
2443) {
2444    let peer_info = PeerInfo {
2445        channel_id: channel_id.to_owned(),
2446        addresses: vec![remote_addr.clone()],
2447        connected_at: tokio::time::Instant::now(),
2448        last_seen: tokio::time::Instant::now(),
2449        status: ConnectionStatus::Connected,
2450        protocols: vec!["p2p-core/1.0.0".to_string()],
2451        heartbeat_count: 0,
2452    };
2453    peers.insert(channel_id.to_owned(), peer_info);
2454}
2455
2456#[cfg(test)]
2457mod tests {
2458    use super::*;
2459    // MCP removed from tests
2460    use std::time::Duration;
2461    use tokio::time::timeout;
2462
2463    /// 2 MiB — used in builder tests to verify max_message_size configuration.
2464    const TEST_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
2465
2466    // Test tool handler for network tests
2467
2468    // MCP removed
2469
2470    /// Helper function to create a test node configuration
2471    fn create_test_node_config() -> NodeConfig {
2472        NodeConfig {
2473            local: true,
2474            port: 0,
2475            ipv6: true,
2476            bootstrap_peers: vec![],
2477            connection_timeout: Duration::from_secs(2),
2478            max_connections: 100,
2479            dht_config: DHTConfig::default(),
2480            bootstrap_cache_config: None,
2481            diversity_config: None,
2482            max_message_size: None,
2483            node_identity: None,
2484            mode: NodeMode::default(),
2485            custom_user_agent: None,
2486            allow_loopback: true,
2487            adaptive_dht_config: AdaptiveDhtConfig::default(),
2488            close_group_cache_dir: None,
2489        }
2490    }
2491
2492    /// Helper function to create a test tool
2493    // MCP removed: test tool helper deleted
2494
2495    #[tokio::test]
2496    async fn test_node_config_default() {
2497        let config = NodeConfig::default();
2498
2499        assert_eq!(config.listen_addrs().len(), 2); // IPv4 + IPv6
2500        assert_eq!(config.max_connections, 10000);
2501        assert_eq!(config.connection_timeout, Duration::from_secs(25));
2502    }
2503
2504    #[tokio::test]
2505    async fn test_dht_config_default() {
2506        let config = DHTConfig::default();
2507
2508        assert_eq!(config.k_value, 20);
2509        assert_eq!(config.alpha_value, 3);
2510        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2511    }
2512
2513    #[test]
2514    fn test_connection_status_variants() {
2515        let connecting = ConnectionStatus::Connecting;
2516        let connected = ConnectionStatus::Connected;
2517        let disconnecting = ConnectionStatus::Disconnecting;
2518        let disconnected = ConnectionStatus::Disconnected;
2519        let failed = ConnectionStatus::Failed("test error".to_string());
2520
2521        assert_eq!(connecting, ConnectionStatus::Connecting);
2522        assert_eq!(connected, ConnectionStatus::Connected);
2523        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2524        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2525        assert_ne!(connecting, connected);
2526
2527        if let ConnectionStatus::Failed(msg) = failed {
2528            assert_eq!(msg, "test error");
2529        } else {
2530            panic!("Expected Failed status");
2531        }
2532    }
2533
2534    #[tokio::test]
2535    async fn test_node_creation() -> Result<()> {
2536        let config = create_test_node_config();
2537        let node = P2PNode::new(config).await?;
2538
2539        // PeerId is derived from the cryptographic identity (32-byte BLAKE3 hash)
2540        assert_eq!(node.peer_id().to_hex().len(), 64);
2541        assert!(!node.is_running());
2542        assert_eq!(node.peer_count().await, 0);
2543        assert!(node.connected_peers().await.is_empty());
2544
2545        Ok(())
2546    }
2547
2548    #[tokio::test]
2549    async fn test_node_lifecycle() -> Result<()> {
2550        let config = create_test_node_config();
2551        let node = P2PNode::new(config).await?;
2552
2553        // Initially not running
2554        assert!(!node.is_running());
2555
2556        // Start the node
2557        node.start().await?;
2558        assert!(node.is_running());
2559
2560        // Check listen addresses were set (at least one)
2561        let listen_addrs = node.listen_addrs().await;
2562        assert!(
2563            !listen_addrs.is_empty(),
2564            "Expected at least one listening address"
2565        );
2566
2567        // Stop the node
2568        node.stop().await?;
2569        assert!(!node.is_running());
2570
2571        Ok(())
2572    }
2573
2574    #[tokio::test]
2575    async fn test_peer_connection() -> Result<()> {
2576        let config1 = create_test_node_config();
2577        let config2 = create_test_node_config();
2578
2579        let node1 = P2PNode::new(config1).await?;
2580        let node2 = P2PNode::new(config2).await?;
2581
2582        node1.start().await?;
2583        node2.start().await?;
2584
2585        let node2_addr = node2
2586            .listen_addrs()
2587            .await
2588            .into_iter()
2589            .find(|a| a.is_ipv4())
2590            .ok_or_else(|| {
2591                P2PError::Network(crate::error::NetworkError::InvalidAddress(
2592                    "Node 2 did not expose an IPv4 listen address".into(),
2593                ))
2594            })?;
2595
2596        // Connect to a real peer (unsigned — no node_identity configured).
2597        // connect_peer returns a transport-level channel ID (String), not a PeerId.
2598        let channel_id = node1.connect_peer(&node2_addr).await?;
2599
2600        // Unauthenticated connections don't appear in the app-level peer maps.
2601        // Verify transport-level tracking via is_connection_active / peers map.
2602        assert!(node1.is_connection_active(&channel_id).await);
2603
2604        // Get peer info from the transport-level peers map (keyed by channel ID)
2605        let peer_info = node1.transport.peer_info_by_channel(&channel_id).await;
2606        assert!(peer_info.is_some());
2607        let info = peer_info.expect("Peer info should exist after connect");
2608        assert_eq!(info.channel_id, channel_id);
2609        assert_eq!(info.status, ConnectionStatus::Connected);
2610        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2611
2612        // Disconnect the channel
2613        node1.remove_channel(&channel_id).await;
2614        assert!(!node1.is_connection_active(&channel_id).await);
2615
2616        node1.stop().await?;
2617        node2.stop().await?;
2618
2619        Ok(())
2620    }
2621
2622    #[tokio::test]
2623    async fn test_connect_peer_rejects_tcp_multiaddr() -> Result<()> {
2624        let config = create_test_node_config();
2625        let node = P2PNode::new(config).await?;
2626
2627        let tcp_addr: MultiAddr = "/ip4/127.0.0.1/tcp/1".parse().unwrap();
2628        let result = node.connect_peer(&tcp_addr).await;
2629
2630        assert!(
2631            matches!(
2632                result,
2633                Err(P2PError::Network(
2634                    crate::error::NetworkError::InvalidAddress(_)
2635                ))
2636            ),
2637            "TCP multiaddrs should be rejected before a QUIC dial is attempted, got: {:?}",
2638            result
2639        );
2640
2641        Ok(())
2642    }
2643
2644    // TODO(windows): Investigate QUIC connection issues on Windows CI
2645    // This test consistently fails on Windows GitHub Actions runners with
2646    // "All connect attempts failed" even with IPv4-only config, long delays,
2647    // and multiple retry attempts. The underlying saorsa-transport library may have
2648    // issues on Windows that need investigation.
2649    // See: https://github.com/dirvine/saorsa-core/issues/TBD
2650    #[cfg_attr(target_os = "windows", ignore)]
2651    #[tokio::test]
2652    async fn test_event_subscription() -> Result<()> {
2653        // PeerConnected/PeerDisconnected only fire for authenticated peers
2654        // (nodes with node_identity that send signed messages).
2655        // Configure both nodes with identities so the event subscription test works.
2656        let identity1 =
2657            Arc::new(NodeIdentity::generate().expect("should generate identity for test node1"));
2658        let identity2 =
2659            Arc::new(NodeIdentity::generate().expect("should generate identity for test node2"));
2660
2661        let mut config1 = create_test_node_config();
2662        config1.ipv6 = false;
2663        config1.node_identity = Some(identity1);
2664
2665        let node2_peer_id = *identity2.peer_id();
2666        let mut config2 = create_test_node_config();
2667        config2.ipv6 = false;
2668        config2.node_identity = Some(identity2);
2669
2670        let node1 = P2PNode::new(config1).await?;
2671        let node2 = P2PNode::new(config2).await?;
2672
2673        node1.start().await?;
2674        node2.start().await?;
2675
2676        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
2677
2678        // Subscribe to node2's events (node2 will receive the signed message)
2679        let mut events = node2.subscribe_events();
2680
2681        let node2_addr = node2.local_addr().ok_or_else(|| {
2682            P2PError::Network(crate::error::NetworkError::ProtocolError(
2683                "No listening address".to_string().into(),
2684            ))
2685        })?;
2686
2687        // Connect node1 → node2
2688        let mut channel_id = None;
2689        for attempt in 0..3 {
2690            if attempt > 0 {
2691                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2692            }
2693            match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
2694                Ok(Ok(id)) => {
2695                    channel_id = Some(id);
2696                    break;
2697                }
2698                Ok(Err(_)) | Err(_) => continue,
2699            }
2700        }
2701        let channel_id = channel_id.expect("Failed to connect after 3 attempts");
2702
2703        // Wait for identity exchange to complete via wait_for_peer_identity.
2704        let target_peer_id = node1
2705            .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2706            .await?;
2707        assert_eq!(target_peer_id, node2_peer_id);
2708
2709        // node1 sends a signed message → node2 authenticates → PeerConnected fires on node2
2710        node1
2711            .send_message(&target_peer_id, "test-topic", b"hello".to_vec(), &[])
2712            .await?;
2713
2714        // Check for PeerConnected event on node2
2715        let event = timeout(Duration::from_secs(2), async {
2716            loop {
2717                match events.recv().await {
2718                    Ok(P2PEvent::PeerConnected(id, _)) => return Ok(id),
2719                    Ok(P2PEvent::Message { .. }) => continue, // skip messages
2720                    Ok(_) => continue,
2721                    Err(e) => return Err(e),
2722                }
2723            }
2724        })
2725        .await;
2726        assert!(event.is_ok(), "Should receive PeerConnected event");
2727        let connected_peer_id = event.expect("Timed out").expect("Channel error");
2728        // The connected peer ID should be node1's app-level ID (a valid PeerId)
2729        assert!(
2730            connected_peer_id.0.iter().any(|&b| b != 0),
2731            "PeerConnected should carry a non-zero peer ID"
2732        );
2733
2734        node1.stop().await?;
2735        node2.stop().await?;
2736
2737        Ok(())
2738    }
2739
2740    // TODO(windows): Same QUIC connection issues as test_event_subscription
2741    #[cfg_attr(target_os = "windows", ignore)]
2742    #[tokio::test]
2743    async fn test_message_sending() -> Result<()> {
2744        // Create two nodes (IPv4-only loopback)
2745        let mut config1 = create_test_node_config();
2746        config1.ipv6 = false;
2747        let node1 = P2PNode::new(config1).await?;
2748        node1.start().await?;
2749
2750        let mut config2 = create_test_node_config();
2751        config2.ipv6 = false;
2752        let node2 = P2PNode::new(config2).await?;
2753        node2.start().await?;
2754
2755        // Wait a bit for nodes to start listening
2756        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2757
2758        // Get actual listening address of node2
2759        let node2_addr = node2.local_addr().ok_or_else(|| {
2760            P2PError::Network(crate::error::NetworkError::ProtocolError(
2761                "No listening address".to_string().into(),
2762            ))
2763        })?;
2764
2765        // Connect node1 to node2
2766        let channel_id =
2767            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2768                Ok(res) => res?,
2769                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2770            };
2771
2772        // Wait for identity exchange via wait_for_peer_identity.
2773        let target_peer_id = node1
2774            .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2775            .await?;
2776        assert_eq!(target_peer_id, node2.peer_id().clone());
2777
2778        // Send a message
2779        let message_data = b"Hello, peer!".to_vec();
2780        let result = match timeout(
2781            Duration::from_millis(500),
2782            node1.send_message(&target_peer_id, "test-protocol", message_data, &[]),
2783        )
2784        .await
2785        {
2786            Ok(res) => res,
2787            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2788        };
2789        // For now, we'll just check that we don't get a "not connected" error
2790        // The actual send might fail due to no handler on the other side
2791        if let Err(e) = &result {
2792            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2793        }
2794
2795        // Try to send to non-existent peer
2796        let non_existent_peer = PeerId::from_bytes([0xFFu8; 32]);
2797        let result = node1
2798            .send_message(&non_existent_peer, "test-protocol", vec![], &[])
2799            .await;
2800        assert!(result.is_err(), "Sending to non-existent peer should fail");
2801
2802        node1.stop().await?;
2803        node2.stop().await?;
2804
2805        Ok(())
2806    }
2807
2808    #[tokio::test]
2809    async fn test_remote_mcp_operations() -> Result<()> {
2810        let config = create_test_node_config();
2811        let node = P2PNode::new(config).await?;
2812
2813        // MCP removed; test reduced to simple start/stop
2814        node.start().await?;
2815        node.stop().await?;
2816        Ok(())
2817    }
2818
2819    #[tokio::test]
2820    async fn test_health_check() -> Result<()> {
2821        let config = create_test_node_config();
2822        let node = P2PNode::new(config).await?;
2823
2824        // Health check should pass with no connections
2825        let result = node.health_check().await;
2826        assert!(result.is_ok());
2827
2828        // Note: We're not actually connecting to real peers here
2829        // since that would require running bootstrap nodes.
2830        // The health check should still pass with no connections.
2831
2832        Ok(())
2833    }
2834
2835    #[tokio::test]
2836    async fn test_node_uptime() -> Result<()> {
2837        let config = create_test_node_config();
2838        let node = P2PNode::new(config).await?;
2839
2840        let uptime1 = node.uptime();
2841        assert!(uptime1 >= Duration::from_secs(0));
2842
2843        // Wait a bit
2844        tokio::time::sleep(Duration::from_millis(10)).await;
2845
2846        let uptime2 = node.uptime();
2847        assert!(uptime2 > uptime1);
2848
2849        Ok(())
2850    }
2851
2852    #[tokio::test]
2853    async fn test_node_config_access() -> Result<()> {
2854        let config = create_test_node_config();
2855        let node = P2PNode::new(config).await?;
2856
2857        let node_config = node.config();
2858        assert_eq!(node_config.max_connections, 100);
2859        // MCP removed
2860
2861        Ok(())
2862    }
2863
2864    #[tokio::test]
2865    async fn test_mcp_server_access() -> Result<()> {
2866        let config = create_test_node_config();
2867        let _node = P2PNode::new(config).await?;
2868
2869        // MCP removed
2870        Ok(())
2871    }
2872
2873    #[tokio::test]
2874    async fn test_dht_access() -> Result<()> {
2875        let config = create_test_node_config();
2876        let node = P2PNode::new(config).await?;
2877
2878        // DHT is always available
2879        let _dht = node.dht();
2880
2881        Ok(())
2882    }
2883
2884    #[tokio::test]
2885    async fn test_node_config_builder() -> Result<()> {
2886        let bootstrap: MultiAddr = "/ip4/127.0.0.1/udp/9000/quic".parse().unwrap();
2887
2888        let config = NodeConfig::builder()
2889            .local(true)
2890            .ipv6(true)
2891            .bootstrap_peer(bootstrap)
2892            .connection_timeout(Duration::from_secs(15))
2893            .max_connections(200)
2894            .max_message_size(TEST_MAX_MESSAGE_SIZE)
2895            .build()?;
2896
2897        assert_eq!(config.listen_addrs().len(), 2); // IPv4 + IPv6
2898        assert!(config.local);
2899        assert!(config.ipv6);
2900        assert_eq!(config.bootstrap_peers.len(), 1);
2901        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2902        assert_eq!(config.max_connections, 200);
2903        assert_eq!(config.max_message_size, Some(TEST_MAX_MESSAGE_SIZE));
2904        assert!(config.allow_loopback); // auto-enabled by local(true)
2905
2906        Ok(())
2907    }
2908
2909    #[tokio::test]
2910    async fn test_bootstrap_peers() -> Result<()> {
2911        let mut config = create_test_node_config();
2912        config.bootstrap_peers = vec![
2913            crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9200),
2914            crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9201),
2915        ];
2916
2917        let node = P2PNode::new(config).await?;
2918
2919        // Start node (which attempts to connect to bootstrap peers)
2920        node.start().await?;
2921
2922        // In a test environment, bootstrap peers may not be available
2923        // The test verifies the node starts correctly with bootstrap configuration
2924        // Peer count may include local/internal tracking, so we just verify it's reasonable
2925        let _peer_count = node.peer_count().await;
2926
2927        node.stop().await?;
2928        Ok(())
2929    }
2930
2931    #[tokio::test]
2932    async fn test_peer_info_structure() {
2933        let peer_info = PeerInfo {
2934            channel_id: "test_peer".to_string(),
2935            addresses: vec!["/ip4/127.0.0.1/tcp/9000".parse::<MultiAddr>().unwrap()],
2936            connected_at: Instant::now(),
2937            last_seen: Instant::now(),
2938            status: ConnectionStatus::Connected,
2939            protocols: vec!["test-protocol".to_string()],
2940            heartbeat_count: 0,
2941        };
2942
2943        assert_eq!(peer_info.channel_id, "test_peer");
2944        assert_eq!(peer_info.addresses.len(), 1);
2945        assert_eq!(peer_info.status, ConnectionStatus::Connected);
2946        assert_eq!(peer_info.protocols.len(), 1);
2947    }
2948
2949    #[tokio::test]
2950    async fn test_serialization() -> Result<()> {
2951        // Test that configs can be serialized/deserialized
2952        let config = create_test_node_config();
2953        let serialized = serde_json::to_string(&config)?;
2954        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2955
2956        assert_eq!(config.local, deserialized.local);
2957        assert_eq!(config.port, deserialized.port);
2958        assert_eq!(config.ipv6, deserialized.ipv6);
2959        assert_eq!(config.bootstrap_peers, deserialized.bootstrap_peers);
2960
2961        Ok(())
2962    }
2963
2964    #[tokio::test]
2965    async fn test_get_channel_id_by_address_found() -> Result<()> {
2966        let config = create_test_node_config();
2967        let node = P2PNode::new(config).await?;
2968
2969        // Manually insert a peer for testing
2970        let test_channel_id = "peer_test_123".to_string();
2971        let test_address = "192.168.1.100:9000";
2972        let test_multiaddr = MultiAddr::quic(test_address.parse().unwrap());
2973
2974        let peer_info = PeerInfo {
2975            channel_id: test_channel_id.clone(),
2976            addresses: vec![test_multiaddr],
2977            connected_at: Instant::now(),
2978            last_seen: Instant::now(),
2979            status: ConnectionStatus::Connected,
2980            protocols: vec!["test-protocol".to_string()],
2981            heartbeat_count: 0,
2982        };
2983
2984        node.transport
2985            .inject_peer(test_channel_id.clone(), peer_info)
2986            .await;
2987
2988        // Test: Find channel by address
2989        let lookup_addr = MultiAddr::quic(test_address.parse().unwrap());
2990        let found_channel_id = node.get_channel_id_by_address(&lookup_addr).await;
2991        assert_eq!(found_channel_id, Some(test_channel_id));
2992
2993        Ok(())
2994    }
2995
2996    #[tokio::test]
2997    async fn test_get_channel_id_by_address_not_found() -> Result<()> {
2998        let config = create_test_node_config();
2999        let node = P2PNode::new(config).await?;
3000
3001        // Test: Try to find a channel that doesn't exist
3002        let unknown_addr = MultiAddr::quic("192.168.1.200:9000".parse().unwrap());
3003        let result = node.get_channel_id_by_address(&unknown_addr).await;
3004        assert_eq!(result, None);
3005
3006        Ok(())
3007    }
3008
3009    #[tokio::test]
3010    async fn test_get_channel_id_by_address_invalid_format() -> Result<()> {
3011        let config = create_test_node_config();
3012        let node = P2PNode::new(config).await?;
3013
3014        // Test: Non-IP address should return None (no matching socket addr)
3015        let ble_addr = MultiAddr::new(crate::address::TransportAddr::Ble {
3016            mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
3017            psm: 0x0025,
3018        });
3019        let result = node.get_channel_id_by_address(&ble_addr).await;
3020        assert_eq!(result, None);
3021
3022        Ok(())
3023    }
3024
3025    #[tokio::test]
3026    async fn test_get_channel_id_by_address_multiple_peers() -> Result<()> {
3027        let config = create_test_node_config();
3028        let node = P2PNode::new(config).await?;
3029
3030        // Add multiple peers with different addresses
3031        let peer1_id = "peer_1".to_string();
3032        let peer1_addr_str = "192.168.1.101:9001";
3033        let peer1_multiaddr = MultiAddr::quic(peer1_addr_str.parse().unwrap());
3034
3035        let peer2_id = "peer_2".to_string();
3036        let peer2_addr_str = "192.168.1.102:9002";
3037        let peer2_multiaddr = MultiAddr::quic(peer2_addr_str.parse().unwrap());
3038
3039        let peer1_info = PeerInfo {
3040            channel_id: peer1_id.clone(),
3041            addresses: vec![peer1_multiaddr],
3042            connected_at: Instant::now(),
3043            last_seen: Instant::now(),
3044            status: ConnectionStatus::Connected,
3045            protocols: vec!["test-protocol".to_string()],
3046            heartbeat_count: 0,
3047        };
3048
3049        let peer2_info = PeerInfo {
3050            channel_id: peer2_id.clone(),
3051            addresses: vec![peer2_multiaddr],
3052            connected_at: Instant::now(),
3053            last_seen: Instant::now(),
3054            status: ConnectionStatus::Connected,
3055            protocols: vec!["test-protocol".to_string()],
3056            heartbeat_count: 0,
3057        };
3058
3059        node.transport
3060            .inject_peer(peer1_id.clone(), peer1_info)
3061            .await;
3062        node.transport
3063            .inject_peer(peer2_id.clone(), peer2_info)
3064            .await;
3065
3066        // Test: Find each channel by their unique address
3067        let found_peer1 = node
3068            .get_channel_id_by_address(&MultiAddr::quic(peer1_addr_str.parse().unwrap()))
3069            .await;
3070        let found_peer2 = node
3071            .get_channel_id_by_address(&MultiAddr::quic(peer2_addr_str.parse().unwrap()))
3072            .await;
3073
3074        assert_eq!(found_peer1, Some(peer1_id));
3075        assert_eq!(found_peer2, Some(peer2_id));
3076
3077        Ok(())
3078    }
3079
3080    #[tokio::test]
3081    async fn test_list_active_connections_empty() -> Result<()> {
3082        let config = create_test_node_config();
3083        let node = P2PNode::new(config).await?;
3084
3085        // Test: No connections initially
3086        let connections = node.list_active_connections().await;
3087        assert!(connections.is_empty());
3088
3089        Ok(())
3090    }
3091
3092    #[tokio::test]
3093    async fn test_list_active_connections_with_peers() -> Result<()> {
3094        let config = create_test_node_config();
3095        let node = P2PNode::new(config).await?;
3096
3097        // Add multiple peers
3098        let peer1_id = "peer_1".to_string();
3099        let peer1_addrs = vec![
3100            MultiAddr::quic("192.168.1.101:9001".parse().unwrap()),
3101            MultiAddr::quic("192.168.1.101:9002".parse().unwrap()),
3102        ];
3103
3104        let peer2_id = "peer_2".to_string();
3105        let peer2_addrs = vec![MultiAddr::quic("192.168.1.102:9003".parse().unwrap())];
3106
3107        let peer1_info = PeerInfo {
3108            channel_id: peer1_id.clone(),
3109            addresses: peer1_addrs.clone(),
3110            connected_at: Instant::now(),
3111            last_seen: Instant::now(),
3112            status: ConnectionStatus::Connected,
3113            protocols: vec!["test-protocol".to_string()],
3114            heartbeat_count: 0,
3115        };
3116
3117        let peer2_info = PeerInfo {
3118            channel_id: peer2_id.clone(),
3119            addresses: peer2_addrs.clone(),
3120            connected_at: Instant::now(),
3121            last_seen: Instant::now(),
3122            status: ConnectionStatus::Connected,
3123            protocols: vec!["test-protocol".to_string()],
3124            heartbeat_count: 0,
3125        };
3126
3127        node.transport
3128            .inject_peer(peer1_id.clone(), peer1_info)
3129            .await;
3130        node.transport
3131            .inject_peer(peer2_id.clone(), peer2_info)
3132            .await;
3133
3134        // Also add to active_connections (list_active_connections iterates over this)
3135        node.transport
3136            .inject_active_connection(peer1_id.clone())
3137            .await;
3138        node.transport
3139            .inject_active_connection(peer2_id.clone())
3140            .await;
3141
3142        // Test: List all active connections
3143        let connections = node.list_active_connections().await;
3144        assert_eq!(connections.len(), 2);
3145
3146        // Verify peer1 and peer2 are in the list
3147        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3148        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3149
3150        assert!(peer1_conn.is_some());
3151        assert!(peer2_conn.is_some());
3152
3153        // Verify addresses match
3154        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3155        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3156
3157        Ok(())
3158    }
3159
3160    #[tokio::test]
3161    async fn test_remove_channel_success() -> Result<()> {
3162        let config = create_test_node_config();
3163        let node = P2PNode::new(config).await?;
3164
3165        // Add a peer
3166        let channel_id = "peer_to_remove".to_string();
3167        let channel_peer_id = PeerId::from_name(&channel_id);
3168        let peer_info = PeerInfo {
3169            channel_id: channel_id.clone(),
3170            addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
3171            connected_at: Instant::now(),
3172            last_seen: Instant::now(),
3173            status: ConnectionStatus::Connected,
3174            protocols: vec!["test-protocol".to_string()],
3175            heartbeat_count: 0,
3176        };
3177
3178        node.transport
3179            .inject_peer(channel_id.clone(), peer_info)
3180            .await;
3181        node.transport
3182            .inject_peer_to_channel(channel_peer_id, channel_id.clone())
3183            .await;
3184
3185        // Verify peer exists
3186        assert!(node.is_peer_connected(&channel_peer_id).await);
3187
3188        // Remove the channel
3189        let removed = node.remove_channel(&channel_id).await;
3190        assert!(removed);
3191
3192        // Verify peer no longer exists
3193        assert!(!node.is_peer_connected(&channel_peer_id).await);
3194
3195        Ok(())
3196    }
3197
3198    #[tokio::test]
3199    async fn test_remove_channel_nonexistent() -> Result<()> {
3200        let config = create_test_node_config();
3201        let node = P2PNode::new(config).await?;
3202
3203        // Try to remove a channel that doesn't exist
3204        let removed = node.remove_channel("nonexistent_peer").await;
3205        assert!(!removed);
3206
3207        Ok(())
3208    }
3209
3210    #[tokio::test]
3211    async fn test_is_peer_connected() -> Result<()> {
3212        let config = create_test_node_config();
3213        let node = P2PNode::new(config).await?;
3214
3215        let channel_id = "test_peer".to_string();
3216        let channel_peer_id = PeerId::from_name(&channel_id);
3217
3218        // Initially not connected
3219        assert!(!node.is_peer_connected(&channel_peer_id).await);
3220
3221        // Add peer
3222        let peer_info = PeerInfo {
3223            channel_id: channel_id.clone(),
3224            addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
3225            connected_at: Instant::now(),
3226            last_seen: Instant::now(),
3227            status: ConnectionStatus::Connected,
3228            protocols: vec!["test-protocol".to_string()],
3229            heartbeat_count: 0,
3230        };
3231
3232        node.transport
3233            .inject_peer(channel_id.clone(), peer_info)
3234            .await;
3235        node.transport
3236            .inject_peer_to_channel(channel_peer_id, channel_id.clone())
3237            .await;
3238
3239        // Now connected
3240        assert!(node.is_peer_connected(&channel_peer_id).await);
3241
3242        // Remove channel
3243        node.remove_channel(&channel_id).await;
3244
3245        // No longer connected
3246        assert!(!node.is_peer_connected(&channel_peer_id).await);
3247
3248        Ok(())
3249    }
3250
3251    #[test]
3252    fn test_normalize_ipv6_wildcard() {
3253        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3254
3255        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3256        let normalized = normalize_wildcard_to_loopback(wildcard);
3257
3258        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3259        assert_eq!(normalized.port(), 8080);
3260    }
3261
3262    #[test]
3263    fn test_normalize_ipv4_wildcard() {
3264        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3265
3266        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3267        let normalized = normalize_wildcard_to_loopback(wildcard);
3268
3269        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3270        assert_eq!(normalized.port(), 9000);
3271    }
3272
3273    #[test]
3274    fn test_normalize_specific_address_unchanged() {
3275        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3276        let normalized = normalize_wildcard_to_loopback(specific);
3277
3278        assert_eq!(normalized, specific);
3279    }
3280
3281    #[test]
3282    fn test_normalize_loopback_unchanged() {
3283        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3284
3285        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3286        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3287        assert_eq!(normalized_v6, loopback_v6);
3288
3289        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3290        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3291        assert_eq!(normalized_v4, loopback_v4);
3292    }
3293
3294    // ---- parse_protocol_message regression tests ----
3295
3296    /// Get current Unix timestamp for tests
3297    fn current_timestamp() -> u64 {
3298        std::time::SystemTime::now()
3299            .duration_since(std::time::UNIX_EPOCH)
3300            .map(|d| d.as_secs())
3301            .unwrap_or(0)
3302    }
3303
3304    /// Helper to create a postcard-serialized WireMessage for tests
3305    fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
3306        let msg = WireMessage {
3307            protocol: protocol.to_string(),
3308            data,
3309            from: PeerId::from_name(from),
3310            timestamp,
3311            user_agent: String::new(),
3312            public_key: Vec::new(),
3313            signature: Vec::new(),
3314        };
3315        postcard::to_stdvec(&msg).unwrap()
3316    }
3317
3318    #[test]
3319    fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
3320        // Regression: For unsigned messages, P2PEvent::Message.source must be the
3321        // transport peer ID, NOT the "from" field from the wire message.
3322        let transport_id = "abcdef0123456789";
3323        let logical_id = "spoofed-logical-id";
3324        let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
3325
3326        let parsed =
3327            parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
3328
3329        // Unsigned message: no authenticated node ID
3330        assert!(parsed.authenticated_node_id.is_none());
3331
3332        match parsed.event {
3333            P2PEvent::Message {
3334                topic,
3335                source,
3336                transport_source,
3337                data,
3338            } => {
3339                assert!(source.is_none(), "unsigned message source must be None");
3340                assert!(
3341                    transport_source.is_none(),
3342                    "non-socket transport source should not produce an IP transport address"
3343                );
3344                assert_eq!(topic, "test/v1");
3345                assert_eq!(data, vec![1u8, 2, 3]);
3346            }
3347            other => panic!("expected P2PEvent::Message, got {:?}", other),
3348        }
3349    }
3350
3351    #[test]
3352    fn test_parse_protocol_message_rejects_invalid_bytes() {
3353        // Random bytes that are not valid bincode should be rejected
3354        assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
3355    }
3356
3357    #[test]
3358    fn test_parse_protocol_message_rejects_truncated_message() {
3359        // A truncated bincode message should fail to deserialize
3360        let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
3361        let truncated = &full_bytes[..full_bytes.len() / 2];
3362        assert!(parse_protocol_message(truncated, "peer-id").is_none());
3363    }
3364
3365    #[test]
3366    fn test_parse_protocol_message_empty_payload() {
3367        let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
3368
3369        let parsed = parse_protocol_message(&bytes, "transport-peer")
3370            .expect("valid message with empty data should parse");
3371
3372        match parsed.event {
3373            P2PEvent::Message { data, .. } => assert!(data.is_empty()),
3374            other => panic!("expected P2PEvent::Message, got {:?}", other),
3375        }
3376    }
3377
3378    #[test]
3379    fn test_parse_protocol_message_records_ip_transport_source() {
3380        let bytes = make_wire_bytes("ping", vec![1], "sender", current_timestamp());
3381
3382        let parsed =
3383            parse_protocol_message(&bytes, "192.168.1.2:4567").expect("valid message should parse");
3384
3385        match parsed.event {
3386            P2PEvent::Message {
3387                transport_source, ..
3388            } => {
3389                assert_eq!(
3390                    transport_source,
3391                    Some(MultiAddr::quic("192.168.1.2:4567".parse().unwrap()))
3392                );
3393            }
3394            other => panic!("expected P2PEvent::Message, got {:?}", other),
3395        }
3396    }
3397
3398    #[test]
3399    fn test_parse_protocol_message_preserves_binary_payload() {
3400        // Verify that arbitrary byte values (including 0xFF, 0x00) survive round-trip
3401        let payload: Vec<u8> = (0..=255).collect();
3402        let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
3403
3404        let parsed = parse_protocol_message(&bytes, "peer-id")
3405            .expect("valid message with full byte range should parse");
3406
3407        match parsed.event {
3408            P2PEvent::Message { data, topic, .. } => {
3409                assert_eq!(topic, "binary/v1");
3410                assert_eq!(
3411                    data, payload,
3412                    "payload must survive bincode round-trip exactly"
3413                );
3414            }
3415            other => panic!("expected P2PEvent::Message, got {:?}", other),
3416        }
3417    }
3418
3419    #[test]
3420    fn test_parse_signed_message_verifies_and_uses_node_id() {
3421        let identity = NodeIdentity::generate().expect("should generate identity");
3422        let protocol = "test/signed";
3423        let data: Vec<u8> = vec![10, 20, 30];
3424        // The `from` field must match the PeerId derived from the public key.
3425        let from = *identity.peer_id();
3426        let timestamp = current_timestamp();
3427        let user_agent = "test/1.0";
3428
3429        // Compute signable bytes the same way create_protocol_message does
3430        let signable =
3431            postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3432                .unwrap();
3433        let sig = identity.sign(&signable).expect("signing should succeed");
3434
3435        let msg = WireMessage {
3436            protocol: protocol.to_string(),
3437            data: data.clone(),
3438            from,
3439            timestamp,
3440            user_agent: user_agent.to_string(),
3441            public_key: identity.public_key().as_bytes().to_vec(),
3442            signature: sig.as_bytes().to_vec(),
3443        };
3444        let bytes = postcard::to_stdvec(&msg).unwrap();
3445
3446        let parsed =
3447            parse_protocol_message(&bytes, "transport-xyz").expect("signed message should parse");
3448
3449        let expected_peer_id = *identity.peer_id();
3450        assert_eq!(
3451            parsed.authenticated_node_id.as_ref(),
3452            Some(&expected_peer_id)
3453        );
3454
3455        match parsed.event {
3456            P2PEvent::Message { source, .. } => {
3457                assert_eq!(
3458                    source.as_ref(),
3459                    Some(&expected_peer_id),
3460                    "source should be the verified PeerId"
3461                );
3462            }
3463            other => panic!("expected P2PEvent::Message, got {:?}", other),
3464        }
3465    }
3466
3467    #[test]
3468    fn test_parse_message_with_bad_signature_is_rejected() {
3469        let identity = NodeIdentity::generate().expect("should generate identity");
3470        let protocol = "test/bad-sig";
3471        let data: Vec<u8> = vec![1, 2, 3];
3472        let from = *identity.peer_id();
3473        let timestamp = current_timestamp();
3474        let user_agent = "test/1.0";
3475
3476        // Sign correct signable bytes
3477        let signable =
3478            postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3479                .unwrap();
3480        let sig = identity.sign(&signable).expect("signing should succeed");
3481
3482        // Tamper with the data (signature was over [1,2,3], not [99,99,99])
3483        let msg = WireMessage {
3484            protocol: protocol.to_string(),
3485            data: vec![99, 99, 99],
3486            from,
3487            timestamp,
3488            user_agent: user_agent.to_string(),
3489            public_key: identity.public_key().as_bytes().to_vec(),
3490            signature: sig.as_bytes().to_vec(),
3491        };
3492        let bytes = postcard::to_stdvec(&msg).unwrap();
3493
3494        assert!(
3495            parse_protocol_message(&bytes, "transport-xyz").is_none(),
3496            "message with bad signature should be rejected"
3497        );
3498    }
3499
3500    #[test]
3501    fn test_parse_message_with_mismatched_from_is_rejected() {
3502        let identity = NodeIdentity::generate().expect("should generate identity");
3503        let protocol = "test/from-mismatch";
3504        let data: Vec<u8> = vec![1, 2, 3];
3505        // Use a `from` field that does NOT match the public key's PeerId.
3506        let fake_from = PeerId::from_bytes([0xDE; 32]);
3507        let timestamp = current_timestamp();
3508        let user_agent = "test/1.0";
3509
3510        let signable =
3511            postcard::to_stdvec(&(protocol, data.as_slice(), &fake_from, timestamp, user_agent))
3512                .unwrap();
3513        let sig = identity.sign(&signable).expect("signing should succeed");
3514
3515        let msg = WireMessage {
3516            protocol: protocol.to_string(),
3517            data,
3518            from: fake_from,
3519            timestamp,
3520            user_agent: user_agent.to_string(),
3521            public_key: identity.public_key().as_bytes().to_vec(),
3522            signature: sig.as_bytes().to_vec(),
3523        };
3524        let bytes = postcard::to_stdvec(&msg).unwrap();
3525
3526        assert!(
3527            parse_protocol_message(&bytes, "transport-xyz").is_none(),
3528            "message with mismatched from field should be rejected"
3529        );
3530    }
3531}