Skip to main content

saorsa_core/
network.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Network module
15//!
16//! This module provides core networking functionality for the P2P Foundation.
17//! It handles peer connections, network events, and node lifecycle management.
18
19use crate::PeerId;
20use crate::adaptive::trust::{TrustRecord, TrustSnapshot};
21use crate::adaptive::{AdaptiveDHT, AdaptiveDhtConfig, TrustEngine, TrustEvent};
22use crate::bootstrap::cache::{CachedCloseGroupPeer, CloseGroupCache};
23use crate::bootstrap::{BootstrapConfig, BootstrapManager};
24use crate::dht::core_engine::AddressType;
25use crate::dht_network_manager::{
26    DhtNetworkConfig, DhtNetworkEvent, DhtNetworkManager, IDENTITY_EXCHANGE_TIMEOUT,
27};
28use crate::error::{IdentityError, NetworkError, P2PError, P2pResult as Result};
29use crate::reachability::spawn_acquisition_driver;
30
31use crate::MultiAddr;
32use crate::identity::node_identity::{NodeIdentity, peer_id_from_public_key};
33use crate::quantum_crypto::saorsa_transport_integration::{MlDsaPublicKey, MlDsaSignature};
34use dashmap::DashMap;
35use futures::StreamExt;
36use parking_lot::Mutex as ParkingMutex;
37use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::net::SocketAddr;
40use std::path::{Path, PathBuf};
41use std::sync::Arc;
42use std::sync::atomic::{AtomicBool, Ordering};
43use std::time::{Duration, SystemTime, UNIX_EPOCH};
44use tokio::sync::{Mutex as TokioMutex, RwLock, broadcast};
45use tokio::time::Instant;
46use tokio_util::sync::CancellationToken;
47use tracing::{debug, info, trace, warn};
48
49/// Wire protocol message format for P2P communication.
50///
51/// Serialized with postcard for compact binary encoding.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub(crate) struct WireMessage {
54    /// Protocol/topic identifier
55    pub(crate) protocol: String,
56    /// Raw payload bytes
57    pub(crate) data: Vec<u8>,
58    /// Sender's peer ID (verified against transport-level identity)
59    pub(crate) from: PeerId,
60    /// Unix timestamp in seconds
61    pub(crate) timestamp: u64,
62    /// User agent string identifying the sender's software.
63    ///
64    /// Convention: `"node/<version>"` for full DHT participants,
65    /// `"client/<version>"` or `"<app>/<version>"` for ephemeral clients.
66    /// Included in the signed bytes — tamper-proof.
67    #[serde(default)]
68    pub(crate) user_agent: String,
69    /// Sender's ML-DSA-65 public key (1952 bytes). Empty if unsigned.
70    #[serde(default)]
71    pub(crate) public_key: Vec<u8>,
72    /// ML-DSA-65 signature over the signable bytes. Empty if unsigned.
73    #[serde(default)]
74    pub(crate) signature: Vec<u8>,
75}
76
77/// Operating mode of a P2P node.
78///
79/// Determines the default user agent and DHT participation behavior.
80/// `Node` peers participate in the DHT routing table; `Client` peers
81/// are treated as ephemeral and excluded from routing.
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
83pub enum NodeMode {
84    /// Full DHT-participant node that maintains routing state and routes messages.
85    #[default]
86    Node,
87    /// Ephemeral client that connects to perform operations without joining the DHT.
88    Client,
89}
90
91/// Internal listen mode controlling which network interfaces the node binds to.
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93enum ListenMode {
94    /// Bind to all interfaces (`0.0.0.0` / `::`).
95    Public,
96    /// Bind to loopback only (`127.0.0.1` / `::1`).
97    Local,
98}
99
100/// Returns the default user agent string for the given mode.
101///
102/// - `Node` → `"node/<saorsa-core-version>"`
103/// - `Client` → `"client/<saorsa-core-version>"`
104pub fn user_agent_for_mode(mode: NodeMode) -> String {
105    let prefix = match mode {
106        NodeMode::Node => "node",
107        NodeMode::Client => "client",
108    };
109    format!("{prefix}/{}", env!("CARGO_PKG_VERSION"))
110}
111
112/// Returns `true` if the user agent identifies a full DHT participant (prefix `"node/"`).
113pub fn is_dht_participant(user_agent: &str) -> bool {
114    user_agent.starts_with("node/")
115}
116
117/// Capacity of the internal channel used by the message receiving system.
118pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
119
120/// Maximum number of concurrent in-flight request/response operations.
121pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
122
123/// Maximum allowed timeout for a single request (5 minutes).
124pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
125
126/// Default listen port for the P2P node.
127const DEFAULT_LISTEN_PORT: u16 = 9000;
128
129/// Default maximum number of concurrent connections.
130const DEFAULT_MAX_CONNECTIONS: usize = 10_000;
131
132/// Default connection timeout in seconds.
133///
134/// The transport adapter keeps each direct Happy Eyeballs attempt short so
135/// DHT lookups can move past offline peers quickly. 25s leaves room for
136/// multi-stage connection strategies and identity exchange while preserving
137/// the historical API default.
138const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 25;
139
140/// Number of cached bootstrap peers to retrieve.
141const BOOTSTRAP_PEER_BATCH_SIZE: usize = 20;
142
143/// Timeout in seconds for waiting on a bootstrap peer's identity exchange.
144///
145/// Tighter than the post-bootstrap budget (`IDENTITY_EXCHANGE_TIMEOUT`,
146/// 5 s) on purpose: bootstrap candidates are unverified and a stuck one
147/// must not be allowed to head-of-line block convergence. 3 s covers
148/// loopback (<100 ms) and direct WAN paths (~1–2 s with one handshake
149/// retry); a relay-tunnelled path with congested ML-DSA verification
150/// can exceed this and will fail identity exchange, but the bootstrap
151/// manager simply moves on to other candidates rather than retrying the
152/// same one.
153///
154/// `wait_for_peer_identity` short-circuits on channel close, so most dead
155/// channels surface in microseconds regardless of this budget.
156const BOOTSTRAP_IDENTITY_TIMEOUT_SECS: u64 = 3;
157
158/// Maximum number of bootstrap peers dialed concurrently in Phase B.
159///
160/// Bounds the fan-out of CLI + cached bootstrap dials so simultaneous
161/// QUIC+PQC handshakes don't spike CPU or saturate the UDP socket. Chosen
162/// low on purpose: each dial runs a full ML-KEM key exchange and ML-DSA
163/// verification, and a cold-start node has no spare compute budget.
164const MAX_CONCURRENT_BOOTSTRAP_DIALS: usize = 4;
165
166/// Number of successful bootstrap connections after which a client-mode
167/// node stops dialing further candidates.
168///
169/// Clients only need enough peers to route their own lookups (α=3 parallel
170/// queries → 6 gives ~2× redundancy) and don't serve the DHT, so a fully
171/// populated close-group buys them nothing. Stopping early cuts cold-start
172/// latency by skipping the tail of slow / dead candidates. Nodes always
173/// dial every candidate so their routing table converges fully.
174const CLIENT_BOOTSTRAP_TARGET: usize = 6;
175
176/// Serde helper — returns `true`.
177const fn default_true() -> bool {
178    true
179}
180
181/// Configuration for a P2P node
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct NodeConfig {
184    /// Bind to loopback only (`127.0.0.1` / `::1`).
185    ///
186    /// When `true`, the node listens on loopback addresses suitable for
187    /// local development and testing. When `false` (the default), the node
188    /// listens on all interfaces (`0.0.0.0` / `::`).
189    #[serde(default)]
190    pub local: bool,
191
192    /// Listen port. `0` means OS-assigned ephemeral port.
193    #[serde(default)]
194    pub port: u16,
195
196    /// Enable IPv6 dual-stack binding.
197    ///
198    /// When `true` (the default), both an IPv4 and an IPv6 address are
199    /// bound. When `false`, only IPv4 is used.
200    #[serde(default = "default_true")]
201    pub ipv6: bool,
202
203    /// Bootstrap peers to connect to on startup.
204    pub bootstrap_peers: Vec<crate::MultiAddr>,
205
206    // MCP removed; will be redesigned later
207    /// Connection timeout duration
208    pub connection_timeout: Duration,
209
210    /// Maximum number of concurrent connections
211    pub max_connections: usize,
212
213    /// DHT configuration
214    pub dht_config: DHTConfig,
215
216    /// Bootstrap cache configuration
217    pub bootstrap_cache_config: Option<BootstrapConfig>,
218
219    /// Optional IP diversity configuration for Sybil protection tuning.
220    ///
221    /// When set, this configuration is used by bootstrap peer discovery and
222    /// other diversity-enforcing subsystems. If `None`, defaults are used.
223    pub diversity_config: Option<crate::security::IPDiversityConfig>,
224
225    /// Optional override for the maximum application-layer message size.
226    ///
227    /// When `None`, the underlying saorsa-transport default is used.
228    #[serde(default)]
229    pub max_message_size: Option<usize>,
230
231    /// Optional node identity for app-level message signing.
232    ///
233    /// When set, outgoing messages are signed with the node's ML-DSA-65 key
234    /// and incoming signed messages are verified at the transport layer.
235    #[serde(skip)]
236    pub node_identity: Option<Arc<NodeIdentity>>,
237
238    /// Operating mode of this node.
239    ///
240    /// Determines the default user agent and DHT participation:
241    /// - `Node` → user agent `"node/<version>"`, added to DHT routing tables.
242    /// - `Client` → user agent `"client/<version>"`, treated as ephemeral.
243    #[serde(default)]
244    pub mode: NodeMode,
245
246    /// Optional custom user agent override.
247    ///
248    /// When `Some`, this value is used instead of the mode-derived default.
249    /// When `None`, the user agent is derived from [`NodeConfig::mode`].
250    #[serde(default, skip_serializing_if = "Option::is_none")]
251    pub custom_user_agent: Option<String>,
252
253    /// Allow loopback addresses (127.0.0.1, ::1) in the transport layer.
254    ///
255    /// In production, loopback addresses are rejected because they are not
256    /// routable. Enable this for local devnets and testnets where all nodes
257    /// run on the same machine.
258    ///
259    /// Default: `false`
260    #[serde(default)]
261    pub allow_loopback: bool,
262
263    /// Adaptive DHT configuration (trust-based swap-out).
264    ///
265    /// Controls whether peers with low trust scores are eligible for
266    /// swap-out from the routing table when better candidates arrive. Use
267    /// `NodeConfigBuilder::trust_enforcement` for a simple on/off toggle.
268    ///
269    /// Default: enabled with a swap threshold of 0.35.
270    #[serde(default)]
271    pub adaptive_dht_config: AdaptiveDhtConfig,
272
273    /// Optional path for persisting the close group cache.
274    ///
275    /// Directory for persisting the close group cache.
276    ///
277    /// When set, the node saves its close group peers and their trust
278    /// scores to `{dir}/close_group_cache.json` on shutdown and after
279    /// bootstrap. On startup, cached peers are loaded and contacted
280    /// first, preserving close group consistency across restarts.
281    ///
282    /// When `None`, no close group cache is used.
283    #[serde(default, skip_serializing_if = "Option::is_none")]
284    pub close_group_cache_dir: Option<PathBuf>,
285}
286
287/// DHT-specific configuration
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct DHTConfig {
290    /// Kademlia K parameter (bucket size)
291    pub k_value: usize,
292
293    /// Kademlia alpha parameter (parallelism)
294    pub alpha_value: usize,
295
296    /// DHT refresh interval
297    pub refresh_interval: Duration,
298}
299
300// ============================================================================
301// Address Construction Helpers
302// ============================================================================
303
304/// Build QUIC listen addresses based on port, IPv6 preference, and listen mode.
305///
306/// All returned addresses use the QUIC transport — the only transport
307/// currently supported for dialing. When additional transports are added,
308/// extend this function to produce addresses for those transports as well.
309///
310/// `ListenMode::Public` uses unspecified (all-interface) addresses;
311/// `ListenMode::Local` uses loopback addresses.
312#[inline]
313fn build_listen_addrs(port: u16, ipv6_enabled: bool, mode: ListenMode) -> Vec<MultiAddr> {
314    let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
315
316    let (v4, v6) = match mode {
317        ListenMode::Public => (
318            std::net::Ipv4Addr::UNSPECIFIED,
319            std::net::Ipv6Addr::UNSPECIFIED,
320        ),
321        ListenMode::Local => (std::net::Ipv4Addr::LOCALHOST, std::net::Ipv6Addr::LOCALHOST),
322    };
323
324    if ipv6_enabled {
325        addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
326            std::net::IpAddr::V6(v6),
327            port,
328        )));
329    }
330
331    addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
332        std::net::IpAddr::V4(v4),
333        port,
334    )));
335
336    addrs
337}
338
339impl NodeConfig {
340    /// Returns the effective user agent string.
341    ///
342    /// If a custom user agent was set, returns that. Otherwise, derives
343    /// the user agent from the node's [`NodeMode`].
344    pub fn user_agent(&self) -> String {
345        self.custom_user_agent
346            .clone()
347            .unwrap_or_else(|| user_agent_for_mode(self.mode))
348    }
349
350    /// Compute the listen addresses from the configuration fields.
351    ///
352    /// The returned addresses are derived from [`local`](Self::local),
353    /// [`port`](Self::port), and [`ipv6`](Self::ipv6).
354    pub fn listen_addrs(&self) -> Vec<MultiAddr> {
355        let mode = if self.local {
356            ListenMode::Local
357        } else {
358            ListenMode::Public
359        };
360        build_listen_addrs(self.port, self.ipv6, mode)
361    }
362
363    /// Create a new NodeConfig with default values
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if default addresses cannot be parsed
368    pub fn new() -> Result<Self> {
369        Ok(Self::default())
370    }
371
372    /// Create a builder for customized NodeConfig construction
373    pub fn builder() -> NodeConfigBuilder {
374        NodeConfigBuilder::default()
375    }
376}
377
378// ============================================================================
379// NodeConfig Builder Pattern
380// ============================================================================
381
382/// Builder for constructing [`NodeConfig`] with a transport-aware fluent API.
383///
384/// Defaults are chosen for quick local development:
385/// - QUIC on a random free port (`0`)
386/// - IPv6 enabled (dual-stack)
387/// - All interfaces (not local-only)
388///
389/// # Examples
390///
391/// ```rust,ignore
392/// // Simplest — QUIC on random port, IPv6 on, all interfaces
393/// let config = NodeConfig::builder().build()?;
394///
395/// // Local dev/test mode (loopback, auto-enables allow_loopback)
396/// let config = NodeConfig::builder()
397///     .local(true)
398///     .build()?;
399/// ```
400#[derive(Debug, Clone)]
401pub struct NodeConfigBuilder {
402    port: u16,
403    ipv6: bool,
404    local: bool,
405    bootstrap_peers: Vec<crate::MultiAddr>,
406    max_connections: Option<usize>,
407    connection_timeout: Option<Duration>,
408    dht_config: Option<DHTConfig>,
409    max_message_size: Option<usize>,
410    mode: NodeMode,
411    custom_user_agent: Option<String>,
412    allow_loopback: Option<bool>,
413    adaptive_dht_config: Option<AdaptiveDhtConfig>,
414    close_group_cache_dir: Option<PathBuf>,
415}
416
417impl Default for NodeConfigBuilder {
418    fn default() -> Self {
419        Self {
420            port: 0,
421            ipv6: true,
422            local: false,
423            bootstrap_peers: Vec::new(),
424            max_connections: None,
425            connection_timeout: None,
426            dht_config: None,
427            max_message_size: None,
428            mode: NodeMode::default(),
429            custom_user_agent: None,
430            allow_loopback: None,
431            adaptive_dht_config: None,
432            close_group_cache_dir: None,
433        }
434    }
435}
436
437impl NodeConfigBuilder {
438    /// Set the listen port. Default: `0` (random free port).
439    pub fn port(mut self, port: u16) -> Self {
440        self.port = port;
441        self
442    }
443
444    /// Enable or disable IPv6 dual-stack. Default: `true`.
445    pub fn ipv6(mut self, enabled: bool) -> Self {
446        self.ipv6 = enabled;
447        self
448    }
449
450    /// Bind to loopback only (`true`) or all interfaces (`false`).
451    ///
452    /// When `true`, automatically enables `allow_loopback` unless explicitly
453    /// overridden via [`Self::allow_loopback`].
454    ///
455    /// Default: `false` (all interfaces).
456    pub fn local(mut self, local: bool) -> Self {
457        self.local = local;
458        self
459    }
460
461    /// Add a bootstrap peer.
462    pub fn bootstrap_peer(mut self, addr: crate::MultiAddr) -> Self {
463        self.bootstrap_peers.push(addr);
464        self
465    }
466
467    /// Set maximum connections.
468    pub fn max_connections(mut self, max: usize) -> Self {
469        self.max_connections = Some(max);
470        self
471    }
472
473    /// Set connection timeout.
474    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
475        self.connection_timeout = Some(timeout);
476        self
477    }
478
479    /// Set DHT configuration.
480    pub fn dht_config(mut self, config: DHTConfig) -> Self {
481        self.dht_config = Some(config);
482        self
483    }
484
485    /// Set maximum application-layer message size in bytes.
486    ///
487    /// If this method is not called, saorsa-transport's built-in default is used.
488    pub fn max_message_size(mut self, max_message_size: usize) -> Self {
489        self.max_message_size = Some(max_message_size);
490        self
491    }
492
493    /// Set the operating mode (Node or Client).
494    pub fn mode(mut self, mode: NodeMode) -> Self {
495        self.mode = mode;
496        self
497    }
498
499    /// Set a custom user agent string, overriding the mode-derived default.
500    pub fn custom_user_agent(mut self, user_agent: impl Into<String>) -> Self {
501        self.custom_user_agent = Some(user_agent.into());
502        self
503    }
504
505    /// Explicitly control whether loopback addresses are allowed in the
506    /// transport layer. When not called, `local(true)` auto-enables this;
507    /// `local(false)` defaults to `false`.
508    pub fn allow_loopback(mut self, allow: bool) -> Self {
509        self.allow_loopback = Some(allow);
510        self
511    }
512
513    /// Enable or disable trust-based peer swap-out.
514    ///
515    /// When `false`, peers are never swapped out of the routing table
516    /// based on trust scores. Trust scores are still tracked but have
517    /// no enforcement effect.
518    ///
519    /// When `true` (the default), peers whose trust score falls below the
520    /// swap threshold (0.35) become eligible for replacement when a
521    /// better candidate arrives.
522    ///
523    /// For fine-grained control over the threshold, use
524    /// [`adaptive_dht_config`](Self::adaptive_dht_config) instead.
525    pub fn trust_enforcement(mut self, enabled: bool) -> Self {
526        let threshold = if enabled {
527            AdaptiveDhtConfig::default().swap_threshold
528        } else {
529            0.0
530        };
531        self.adaptive_dht_config = Some(AdaptiveDhtConfig {
532            swap_threshold: threshold,
533        });
534        self
535    }
536
537    /// Set the full adaptive DHT configuration.
538    ///
539    /// Overrides any previous call to [`trust_enforcement`](Self::trust_enforcement).
540    pub fn adaptive_dht_config(mut self, config: AdaptiveDhtConfig) -> Self {
541        self.adaptive_dht_config = Some(config);
542        self
543    }
544
545    /// Set the directory for persisting the close group cache.
546    ///
547    /// The node writes `close_group_cache.json` inside this directory on
548    /// shutdown and after bootstrap, and loads it on startup.
549    pub fn close_group_cache_dir(mut self, path: impl Into<PathBuf>) -> Self {
550        self.close_group_cache_dir = Some(path.into());
551        self
552    }
553
554    /// Build the [`NodeConfig`].
555    ///
556    /// # Errors
557    ///
558    /// Returns an error if address construction fails.
559    pub fn build(self) -> Result<NodeConfig> {
560        // local mode auto-enables allow_loopback unless explicitly overridden
561        let allow_loopback = self.allow_loopback.unwrap_or(self.local);
562
563        Ok(NodeConfig {
564            local: self.local,
565            port: self.port,
566            ipv6: self.ipv6,
567            bootstrap_peers: self.bootstrap_peers,
568            connection_timeout: self
569                .connection_timeout
570                .unwrap_or(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS)),
571            max_connections: self.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
572            dht_config: self.dht_config.unwrap_or_default(),
573            bootstrap_cache_config: None,
574            diversity_config: None,
575            max_message_size: self.max_message_size,
576            node_identity: None,
577            mode: self.mode,
578            custom_user_agent: self.custom_user_agent,
579            allow_loopback,
580            adaptive_dht_config: self.adaptive_dht_config.unwrap_or_default(),
581            close_group_cache_dir: self.close_group_cache_dir,
582        })
583    }
584}
585
586impl Default for NodeConfig {
587    fn default() -> Self {
588        Self {
589            local: false,
590            port: DEFAULT_LISTEN_PORT,
591            ipv6: true,
592            bootstrap_peers: Vec::new(),
593            connection_timeout: Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS),
594            max_connections: DEFAULT_MAX_CONNECTIONS,
595            dht_config: DHTConfig::default(),
596            bootstrap_cache_config: None,
597            diversity_config: None,
598            max_message_size: None,
599            node_identity: None,
600            mode: NodeMode::default(),
601            custom_user_agent: None,
602            allow_loopback: false,
603            adaptive_dht_config: AdaptiveDhtConfig::default(),
604            close_group_cache_dir: None,
605        }
606    }
607}
608
609impl DHTConfig {
610    /// Default K value (bucket size) for Kademlia routing.
611    pub const DEFAULT_K_VALUE: usize = 20;
612    const DEFAULT_ALPHA_VALUE: usize = 3;
613    const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
614    /// Minimum k_value — values below this produce degenerate routing behavior.
615    const MIN_K_VALUE: usize = 4;
616
617    /// Validate parameter safety constraints (Section 4 points 1-13).
618    ///
619    /// Returns `Err` if any constraint is violated.
620    pub fn validate(&self) -> Result<()> {
621        if self.k_value < Self::MIN_K_VALUE {
622            return Err(P2PError::Validation(
623                format!(
624                    "k_value must be >= {} (got {}), values below {} produce degenerate behavior",
625                    Self::MIN_K_VALUE,
626                    self.k_value,
627                    Self::MIN_K_VALUE,
628                )
629                .into(),
630            ));
631        }
632        if self.alpha_value < 1 {
633            return Err(P2PError::Validation(
634                format!("alpha_value must be >= 1 (got {})", self.alpha_value).into(),
635            ));
636        }
637        if self.refresh_interval.is_zero() {
638            return Err(P2PError::Validation("refresh_interval must be > 0".into()));
639        }
640        Ok(())
641    }
642}
643
644impl Default for DHTConfig {
645    fn default() -> Self {
646        Self {
647            k_value: Self::DEFAULT_K_VALUE,
648            alpha_value: Self::DEFAULT_ALPHA_VALUE,
649            refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
650        }
651    }
652}
653
654/// Information about a connected peer
655#[derive(Debug, Clone)]
656pub struct PeerInfo {
657    /// Transport-level channel identifier (internal use only).
658    #[allow(dead_code)]
659    pub(crate) channel_id: String,
660
661    /// Peer's addresses
662    pub addresses: Vec<MultiAddr>,
663
664    /// Connection timestamp
665    pub connected_at: Instant,
666
667    /// Last seen timestamp
668    pub last_seen: Instant,
669
670    /// Connection status
671    pub status: ConnectionStatus,
672
673    /// Supported protocols
674    pub protocols: Vec<String>,
675
676    /// Number of heartbeats received
677    pub heartbeat_count: u64,
678}
679
680/// Connection status for a peer
681#[derive(Debug, Clone, PartialEq)]
682pub enum ConnectionStatus {
683    /// Connection is being established
684    Connecting,
685    /// Connection is established and active
686    Connected,
687    /// Connection is being closed
688    Disconnecting,
689    /// Connection is closed
690    Disconnected,
691    /// Connection failed
692    Failed(String),
693}
694
695/// Network events that can occur in the P2P system
696///
697/// Events are broadcast to all listeners and provide real-time
698/// notifications of network state changes and message arrivals.
699#[derive(Debug, Clone)]
700pub enum P2PEvent {
701    /// Message received from a peer on a specific topic
702    Message {
703        /// Topic or channel the message was sent on
704        topic: String,
705        /// For signed messages this is the authenticated app-level [`PeerId`];
706        /// `None` for unsigned messages.
707        source: Option<PeerId>,
708        /// Raw message data payload
709        data: Vec<u8>,
710    },
711    /// An authenticated peer has connected (first signed message verified on any channel).
712    /// The `user_agent` identifies the remote software (e.g. `"node/0.12.1"`, `"client/1.0"`).
713    PeerConnected(PeerId, String),
714    /// An authenticated peer has fully disconnected (all channels closed).
715    PeerDisconnected(PeerId),
716}
717
718/// Response from a peer to a request sent via [`P2PNode::send_request`].
719///
720/// Contains the response payload along with metadata about the responder
721/// and round-trip latency.
722#[derive(Debug, Clone)]
723pub struct PeerResponse {
724    /// The peer that sent the response.
725    pub peer_id: PeerId,
726    /// Raw response payload bytes.
727    pub data: Vec<u8>,
728    /// Round-trip latency from request to response.
729    pub latency: Duration,
730}
731
732/// Wire format for request/response correlation.
733///
734/// Wraps application payloads with a message ID and direction flag
735/// so the receive loop can route responses back to waiting callers.
736#[derive(Debug, Clone, Serialize, Deserialize)]
737pub(crate) struct RequestResponseEnvelope {
738    /// Unique identifier to correlate request ↔ response.
739    pub(crate) message_id: String,
740    /// `false` for requests, `true` for responses.
741    pub(crate) is_response: bool,
742    /// Application payload.
743    pub(crate) payload: Vec<u8>,
744}
745
746/// An in-flight request awaiting a response from a specific peer.
747pub(crate) struct PendingRequest {
748    /// Oneshot sender for delivering the response payload.
749    pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
750    /// The peer we expect the response from (for origin validation).
751    pub(crate) expected_peer: PeerId,
752}
753
754/// Short grace period after closing stale QUIC connections before re-dialing.
755///
756/// `disconnect_channel` is async and waits for the QUIC close, but the
757/// transport endpoint may need a moment to fully release internal state.
758/// Only applied when stale channels were actually disconnected.
759const QUIC_TEARDOWN_GRACE: Duration = Duration::from_millis(100);
760
761/// Main P2P network node that manages connections, routing, and communication
762///
763/// This struct represents a complete P2P network participant that can:
764/// - Connect to other peers via QUIC transport
765/// - Participate in distributed hash table (DHT) operations
766/// - Send and receive messages through various protocols
767/// - Handle network events and peer lifecycle
768///
769/// Transport concerns (connections, messaging, events) are delegated to
770/// `TransportHandle`.
771pub struct P2PNode {
772    /// Node configuration
773    config: NodeConfig,
774
775    /// Our peer ID
776    peer_id: PeerId,
777
778    /// Transport handle owning all QUIC / peer / event state
779    transport: Arc<crate::transport_handle::TransportHandle>,
780
781    /// Node start time
782    start_time: Instant,
783
784    /// Shutdown token — cancelled when the node should stop
785    shutdown: CancellationToken,
786
787    /// Adaptive DHT layer — owns both the DHT manager and the trust engine.
788    /// All DHT operations and trust signals go through this component.
789    adaptive_dht: AdaptiveDHT,
790
791    /// Bootstrap cache manager for peer discovery
792    bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
793
794    /// Bootstrap state tracking - indicates whether peer discovery has completed
795    is_bootstrapped: Arc<AtomicBool>,
796
797    /// Whether `start()` has been called (and `stop()` has not yet completed)
798    is_started: Arc<AtomicBool>,
799
800    /// Per-peer locks that serialise reconnect attempts so concurrent sends
801    /// to the same stale peer don't race to dial.  Entries accumulate over
802    /// the node's lifetime; each is a lightweight `Arc<TokioMutex<()>>`.
803    reconnect_locks: ParkingMutex<HashMap<PeerId, Arc<TokioMutex<()>>>>,
804
805    /// The peer ID of the node currently relaying traffic for us (ADR-014).
806    ///
807    /// Set after the reachability classifier acquires a relay in `start()`.
808    /// The relayer monitor watches this against the K-closest set: if the
809    /// relayer drops out, it triggers rebinding.
810    ///
811    /// `None` when the node is publicly reachable (no relay needed) or
812    /// before classification has run.
813    relayer_peer_id: Arc<RwLock<Option<PeerId>>>,
814
815    /// The relay-allocated public address (ADR-014).
816    ///
817    /// Set after a proactive MASQUE relay is acquired in `start()`. This is
818    /// the address that external peers must dial to reach this node through
819    /// the relay. `None` when the node is publicly reachable (no relay) or
820    /// before classification has run.
821    relay_address: Arc<RwLock<Option<SocketAddr>>>,
822}
823
824/// Normalize wildcard bind addresses to localhost loopback addresses
825///
826/// saorsa-transport correctly rejects "unspecified" addresses (0.0.0.0 and [::]) for remote connections
827/// because you cannot connect TO an unspecified address - these are only valid for BINDING.
828///
829/// This function converts wildcard addresses to appropriate loopback addresses for local connections:
830/// - IPv6 [::]:port → ::1:port (IPv6 loopback)
831/// - IPv4 0.0.0.0:port → 127.0.0.1:port (IPv4 loopback)
832/// - All other addresses pass through unchanged
833///
834/// # Arguments
835/// * `addr` - The SocketAddr to normalize
836///
837/// # Returns
838/// * Normalized SocketAddr suitable for remote connections
839pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
840    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
841
842    if addr.ip().is_unspecified() {
843        // Convert unspecified addresses to loopback
844        let loopback_ip = match addr {
845            std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), // ::1
846            std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), // 127.0.0.1
847        };
848        std::net::SocketAddr::new(loopback_ip, addr.port())
849    } else {
850        // Not a wildcard address, pass through unchanged
851        addr
852    }
853}
854
855impl P2PNode {
856    /// Create a new P2P node with the given configuration
857    pub async fn new(config: NodeConfig) -> Result<Self> {
858        // Ensure a cryptographic identity exists — generate one if not provided.
859        let node_identity = match config.node_identity.clone() {
860            Some(identity) => identity,
861            None => Arc::new(NodeIdentity::generate()?),
862        };
863
864        // Derive the canonical peer ID from the cryptographic identity.
865        let peer_id = *node_identity.peer_id();
866
867        // Validate parameter safety constraints (Section 4 points 1-13).
868        // Reject invalid config early, before any resources are allocated.
869        config.dht_config.validate()?;
870        if let Some(ref diversity) = config.diversity_config {
871            diversity
872                .validate()
873                .map_err(|e| P2PError::Validation(format!("IP diversity config: {e}").into()))?;
874        }
875
876        // Initialize bootstrap cache manager
877        let bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
878        let bootstrap_manager =
879            match BootstrapManager::with_node_config(bootstrap_config, &config).await {
880                Ok(manager) => Some(Arc::new(RwLock::new(manager))),
881                Err(e) => {
882                    warn!("Failed to initialize bootstrap manager: {e}, continuing without cache");
883                    None
884                }
885            };
886
887        // Build transport handle with all transport-level concerns
888        let transport_config = crate::transport_handle::TransportConfig::from_node_config(
889            &config,
890            crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
891            node_identity.clone(),
892        );
893        let transport =
894            Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
895
896        // Initialize AdaptiveDHT — creates the trust engine and DHT manager
897        let dht_manager_config = DhtNetworkConfig {
898            peer_id,
899            node_config: config.clone(),
900            request_timeout: config.connection_timeout,
901            max_concurrent_operations: MAX_ACTIVE_REQUESTS,
902            enable_security: true,
903            swap_threshold: 0.0, // Set by AdaptiveDHT::new() from AdaptiveDhtConfig
904        };
905        let adaptive_dht = AdaptiveDHT::new(
906            transport.clone(),
907            dht_manager_config,
908            config.adaptive_dht_config.clone(),
909        )
910        .await?;
911
912        let node = Self {
913            config,
914            peer_id,
915            transport,
916            start_time: Instant::now(),
917            shutdown: CancellationToken::new(),
918            adaptive_dht,
919            bootstrap_manager,
920            is_bootstrapped: Arc::new(AtomicBool::new(false)),
921            is_started: Arc::new(AtomicBool::new(false)),
922            reconnect_locks: ParkingMutex::new(HashMap::new()),
923            relayer_peer_id: Arc::new(RwLock::new(None)),
924            relay_address: Arc::new(RwLock::new(None)),
925        };
926        info!(
927            "Created P2P node with peer ID: {} (call start() to begin networking)",
928            node.peer_id
929        );
930
931        Ok(node)
932    }
933
934    /// Get the peer ID of this node.
935    pub fn peer_id(&self) -> &PeerId {
936        &self.peer_id
937    }
938
939    /// Get the transport handle for sharing with other components.
940    pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
941        &self.transport
942    }
943
944    /// The relay-allocated public address, if this node acquired a MASQUE relay.
945    ///
946    /// Returns `Some(addr)` when the node is behind NAT and successfully
947    /// acquired a proactive relay during `start()`. External peers must dial
948    /// this address to reach the node through the relay. Returns `None` when
949    /// the node is publicly reachable or no relay was established.
950    pub async fn relay_address(&self) -> Option<SocketAddr> {
951        *self.relay_address.read().await
952    }
953
954    pub fn local_addr(&self) -> Option<MultiAddr> {
955        self.transport.local_addr()
956    }
957
958    /// Check if the node has completed the initial bootstrap process
959    ///
960    /// Returns `true` if the node has successfully connected to at least one
961    /// bootstrap peer and performed peer discovery (FIND_NODE).
962    pub fn is_bootstrapped(&self) -> bool {
963        self.is_bootstrapped.load(Ordering::SeqCst)
964    }
965
966    /// Manually trigger re-bootstrap (useful for recovery or network rejoin)
967    ///
968    /// This clears the bootstrapped state and attempts to reconnect to
969    /// bootstrap peers and discover new peers.
970    pub async fn re_bootstrap(&self) -> Result<()> {
971        self.is_bootstrapped.store(false, Ordering::SeqCst);
972        self.connect_bootstrap_peers(None).await
973    }
974
975    // =========================================================================
976    // Trust API — delegates to AdaptiveDHT
977    // =========================================================================
978
979    /// Get the trust engine for advanced use cases
980    pub fn trust_engine(&self) -> Arc<TrustEngine> {
981        self.adaptive_dht.trust_engine().clone()
982    }
983
984    /// Report a trust event for a peer.
985    ///
986    /// Core only records penalties (connection failures). Positive trust
987    /// signals are the consumer's responsibility via [`TrustEvent::ApplicationSuccess`].
988    ///
989    /// # Example
990    ///
991    /// ```rust,ignore
992    /// use saorsa_core::adaptive::TrustEvent;
993    ///
994    /// node.report_trust_event(&peer_id, TrustEvent::ApplicationSuccess(1.0)).await;
995    /// node.report_trust_event(&peer_id, TrustEvent::ConnectionFailed).await;
996    /// ```
997    pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
998        self.adaptive_dht.report_trust_event(peer_id, event).await;
999    }
1000
1001    /// Get the current trust score for a peer (0.0 to 1.0).
1002    ///
1003    /// Returns 0.5 (neutral) for unknown peers.
1004    pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
1005        self.adaptive_dht.peer_trust(peer_id)
1006    }
1007
1008    /// Get the AdaptiveDHT component for direct access
1009    pub fn adaptive_dht(&self) -> &AdaptiveDHT {
1010        &self.adaptive_dht
1011    }
1012
1013    // =========================================================================
1014    // Request/Response API — Automatic Trust Feedback
1015    // =========================================================================
1016
1017    /// Send a request to a peer and wait for a response with automatic trust penalty reporting.
1018    ///
1019    /// Unlike fire-and-forget `send_message()`, this method:
1020    /// 1. Wraps the payload in a `RequestResponseEnvelope` with a unique message ID
1021    /// 2. Sends it on the `/rr/<protocol>` protocol prefix
1022    /// 3. Waits for a matching response (or timeout)
1023    /// 4. Automatically reports failure to the trust engine (success is the expected baseline)
1024    ///
1025    /// The remote peer's handler should call `send_response()` with the
1026    /// incoming message ID to route the response back.
1027    ///
1028    /// # Arguments
1029    ///
1030    /// * `peer_id` - Target peer
1031    /// * `protocol` - Application protocol name (e.g. `"peer_info"`)
1032    /// * `data` - Request payload bytes
1033    /// * `timeout` - Maximum time to wait for a response
1034    ///
1035    /// # Returns
1036    ///
1037    /// A `PeerResponse` on success, or an error on timeout / connection failure.
1038    ///
1039    /// # Example
1040    ///
1041    /// ```rust,ignore
1042    /// let response = node.send_request(&peer_id, "peer_info", request_data, Duration::from_secs(10)).await?;
1043    /// println!("Got {} bytes from {}", response.data.len(), response.peer_id);
1044    /// ```
1045    pub async fn send_request(
1046        &self,
1047        peer_id: &PeerId,
1048        protocol: &str,
1049        data: Vec<u8>,
1050        timeout: Duration,
1051    ) -> Result<PeerResponse> {
1052        match self
1053            .transport
1054            .send_request(peer_id, protocol, data, timeout)
1055            .await
1056        {
1057            Ok(resp) => Ok(resp),
1058            Err(e) => {
1059                let event = if matches!(&e, P2PError::Timeout(_)) {
1060                    TrustEvent::ConnectionTimeout
1061                } else {
1062                    TrustEvent::ConnectionFailed
1063                };
1064                self.report_trust_event(peer_id, event).await;
1065                Err(e)
1066            }
1067        }
1068    }
1069
1070    pub async fn send_response(
1071        &self,
1072        peer_id: &PeerId,
1073        protocol: &str,
1074        message_id: &str,
1075        data: Vec<u8>,
1076    ) -> Result<()> {
1077        self.transport
1078            .send_response(peer_id, protocol, message_id, data)
1079            .await
1080    }
1081
1082    pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
1083        crate::transport_handle::TransportHandle::parse_request_envelope(data)
1084    }
1085
1086    pub async fn subscribe(&self, topic: &str) -> Result<()> {
1087        self.transport.subscribe(topic).await
1088    }
1089
1090    pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1091        self.transport.publish(topic, data).await
1092    }
1093
1094    /// Get the node configuration
1095    pub fn config(&self) -> &NodeConfig {
1096        &self.config
1097    }
1098
1099    /// Start the P2P node
1100    pub async fn start(&self) -> Result<()> {
1101        info!("Starting P2P node...");
1102
1103        // Start bootstrap manager background tasks
1104        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1105            let mut manager = bootstrap_manager.write().await;
1106            manager
1107                .start_maintenance()
1108                .map_err(|e| protocol_error(format!("Failed to start bootstrap manager: {e}")))?;
1109            info!("Bootstrap cache manager started");
1110        }
1111
1112        // Start transport listeners and message receiving
1113        self.transport.start_network_listeners().await?;
1114
1115        // Start the adaptive DHT layer (DHT manager + trust engine)
1116        self.adaptive_dht.start().await?;
1117
1118        // Log current listen addresses
1119        let listen_addrs = self.transport.listen_addrs().await;
1120        info!("P2P node started on addresses: {:?}", listen_addrs);
1121
1122        // NOTE: Message receiving is now integrated into the accept loop in start_network_listeners()
1123        // The old start_message_receiving_system() is no longer needed as it competed with the accept
1124        // loop for incoming connections, causing messages to be lost.
1125
1126        // Load close group cache and import trust scores before connecting to peers.
1127        // This ensures trust scores are available when peers are added to the routing table.
1128        let close_group_cache = if let Some(ref dir) = self.config.close_group_cache_dir {
1129            match CloseGroupCache::load_from_dir(dir).await {
1130                Ok(Some(cache)) => {
1131                    // Filter out peers with non-finite trust scores (NaN/Inf)
1132                    // that could corrupt trust engine state or sort ordering.
1133                    let original_count = cache.peers.len();
1134                    let cache = CloseGroupCache {
1135                        peers: cache
1136                            .peers
1137                            .into_iter()
1138                            .filter(|p| p.trust.score.is_finite())
1139                            .collect(),
1140                        ..cache
1141                    };
1142                    let filtered_count = original_count - cache.peers.len();
1143                    if filtered_count > 0 {
1144                        warn!(
1145                            "Filtered {filtered_count} peers with non-finite trust scores from close group cache"
1146                        );
1147                    }
1148
1149                    let trust_snapshot = TrustSnapshot {
1150                        peers: cache
1151                            .peers
1152                            .iter()
1153                            .map(|p| (p.peer_id, p.trust.clone()))
1154                            .collect(),
1155                    };
1156                    self.adaptive_dht
1157                        .trust_engine()
1158                        .import_snapshot(&trust_snapshot);
1159                    info!(
1160                        "Loaded {} peers from close group cache (trust scores imported)",
1161                        cache.peers.len()
1162                    );
1163                    Some(cache)
1164                }
1165                Ok(None) => {
1166                    debug!(
1167                        "No close group cache found in {}, fresh start",
1168                        dir.display()
1169                    );
1170                    None
1171                }
1172                Err(e) => {
1173                    warn!(
1174                        "Failed to load close group cache from {}: {e}",
1175                        dir.display()
1176                    );
1177                    None
1178                }
1179            }
1180        } else {
1181            None
1182        };
1183
1184        // Connect to bootstrap peers
1185        self.connect_bootstrap_peers(close_group_cache.as_ref())
1186            .await?;
1187
1188        // Emit BootstrapComplete — the node is connected to the network and
1189        // the DHT routing table is populated; consumers waiting on this
1190        // event can start issuing queries. The relay-acquisition driver
1191        // runs asynchronously after this point, so the node's published
1192        // self-record may be direct-only for a brief window until the
1193        // driver's first acquisition attempt finishes.
1194        {
1195            let dht = self.adaptive_dht.dht_manager();
1196            let rt_size = dht.get_routing_table_size().await;
1197            dht.emit_event(DhtNetworkEvent::BootstrapComplete { num_peers: rt_size });
1198        }
1199
1200        // Spawn the relay-acquisition driver for Node mode.
1201        //
1202        // The driver unconditionally tries to acquire a MASQUE relay from
1203        // an XOR-closest peer right after bootstrap — there is no public/
1204        // private classification. Private candidates are filtered out
1205        // ambiently: their Direct addresses are unreachable from outside
1206        // their NAT, so the QUIC dial fails and the walker advances to
1207        // the next-closest peer.
1208        //
1209        // The driver also owns the relay-lost → republish → reacquire
1210        // state machine (see `reachability::driver` for the full flow).
1211        // Clients (`NodeMode::Client`) do not run the driver at all: they
1212        // are outbound-only and do not need to be reachable.
1213        if self.config.mode != NodeMode::Client {
1214            spawn_acquisition_driver(
1215                self.adaptive_dht.dht_manager().clone(),
1216                Arc::clone(&self.transport),
1217                Arc::clone(&self.relayer_peer_id),
1218                Arc::clone(&self.relay_address),
1219                self.shutdown.clone(),
1220            );
1221        } else {
1222            info!("client mode — skipping relay acquisition driver");
1223        }
1224
1225        // Spawn background task to forward peer address updates to the DHT.
1226        //
1227        // Two event streams are bridged from the transport layer onto DHT
1228        // routing-table mutations:
1229        //
1230        //  - **Relay established**: when THIS node sets up a MASQUE relay,
1231        //    perform a DHT self-lookup so the transport's re-advertisement
1232        //    loop can ADD_ADDRESS the new relay address to the K closest
1233        //    peers — propagating it beyond peers we already happen to be
1234        //    connected to.
1235        //  - **Peer address update**: when a connected peer advertises a new
1236        //    reachable address via ADD_ADDRESS (typically its relay), update
1237        //    the DHT routing table so future lookups return that address.
1238        //
1239        // Both are handled in a `tokio::select!` against the receiver
1240        // futures so updates propagate immediately. The previous
1241        // implementation polled both queues on a 1-second interval, which
1242        // opened a race window in which a freshly-established relay was
1243        // invisible to outbound DHT queries until the next tick — causing
1244        // the first peers to dial direct (and fail) before learning about
1245        // the relay.
1246        //
1247        // **Slow work isolation**: the relay-propagation path runs an
1248        // iterative DHT lookup (`find_closest_nodes_network`) which can
1249        // take many seconds. Doing it inline in the select loop would
1250        // starve the peer-address-update branch and back up the bounded
1251        // forwarder mpsc into drop territory. Instead, the lookup +
1252        // publish is detached into its own task per relay event, so the
1253        // select loop keeps polling both branches.
1254        // DHT_BRIDGE: forward peer-advertised address updates from the
1255        // transport layer onto DHT routing table mutations. When a connected
1256        // peer's ADD_ADDRESS notification carries a different IP than the
1257        // connection's source (i.e., the peer is behind a relay or has
1258        // migrated), merge the advertised address into the peer's DHT entry.
1259        //
1260        // This node's OWN relay state changes are NOT handled here — the
1261        // relay acquisition driver (see `reachability::driver`) owns them
1262        // directly, so the "relay established" branch no longer belongs to
1263        // the bridge. The driver knows the full typed address set for the
1264        // self-record; the bridge did not.
1265        {
1266            let transport = Arc::clone(&self.transport);
1267            let dht = self.adaptive_dht.dht_manager().clone();
1268            let shutdown = self.shutdown.clone();
1269            tokio::spawn(async move {
1270                loop {
1271                    tokio::select! {
1272                        biased;
1273                        _ = shutdown.cancelled() => break,
1274                        update = transport.recv_peer_address_update() => {
1275                            let Some((peer_addr, advertised_addr)) = update else { break };
1276                            let normalized_peer =
1277                                saorsa_transport::shared::normalize_socket_addr(peer_addr);
1278                            let normalized_adv =
1279                                saorsa_transport::shared::normalize_socket_addr(advertised_addr);
1280                            // Only update DHT when the advertised IP differs
1281                            // from the peer's connection IP. Same-IP updates
1282                            // are just different NATted ports (useless for
1283                            // symmetric NAT); different-IP means a relay.
1284                            if normalized_peer.ip() == normalized_adv.ip() {
1285                                debug!(
1286                                    "DHT_BRIDGE: dropping same-IP update peer={} addr={}",
1287                                    normalized_peer,
1288                                    normalized_adv
1289                                );
1290                                continue;
1291                            }
1292                            info!(
1293                                "DHT_BRIDGE: processing relay update peer={} addr={}",
1294                                normalized_peer,
1295                                normalized_adv
1296                            );
1297                            // Look up peer ID by address (tries both IPv4 and
1298                            // IPv4-mapped IPv6 forms via dual_stack_alternate).
1299                            // For symmetric NAT, this may fail because the
1300                            // connection's channel key uses a different NATted port.
1301                            if let Some(peer_id) = transport.peer_id_for_addr(&normalized_peer).await {
1302                                let multi_addr = MultiAddr::quic(normalized_adv);
1303                                info!(
1304                                    "Updating DHT: peer {} relay address {} (connection was {})",
1305                                    peer_id, advertised_addr, peer_addr
1306                                );
1307                                dht.touch_node_typed(
1308                                    &peer_id,
1309                                    Some(&multi_addr),
1310                                    AddressType::Relay,
1311                                )
1312                                .await;
1313                            }
1314                        }
1315                    }
1316                }
1317            });
1318        }
1319
1320        self.is_started
1321            .store(true, std::sync::atomic::Ordering::Release);
1322
1323        Ok(())
1324    }
1325
1326    // start_network_listeners and start_message_receiving_system
1327    // are now implemented in TransportHandle
1328
1329    /// Run the P2P node (blocks until shutdown)
1330    pub async fn run(&self) -> Result<()> {
1331        if !self.is_running() {
1332            self.start().await?;
1333        }
1334
1335        info!("P2P node running...");
1336
1337        // Block until shutdown is signalled. All background work (connection
1338        // lifecycle, DHT maintenance, EigenTrust) runs in dedicated tasks.
1339        self.shutdown.cancelled().await;
1340
1341        info!("P2P node stopped");
1342        Ok(())
1343    }
1344
1345    /// Stop the P2P node
1346    pub async fn stop(&self) -> Result<()> {
1347        info!("Stopping P2P node...");
1348
1349        // Save close group cache before tearing down the DHT and transport layers.
1350        if let Some(ref dir) = self.config.close_group_cache_dir
1351            && let Err(e) = self.save_close_group_cache(dir).await
1352        {
1353            warn!("Failed to save close group cache on shutdown: {e}");
1354        }
1355
1356        // Signal the run loop to exit
1357        self.shutdown.cancel();
1358
1359        // Stop DHT layer first so leave messages can be sent while transport is still active.
1360        self.adaptive_dht.stop().await?;
1361
1362        // Stop the transport layer (shutdown endpoints, join tasks, disconnect peers)
1363        self.transport.stop().await?;
1364
1365        self.is_started
1366            .store(false, std::sync::atomic::Ordering::Release);
1367
1368        info!("P2P node stopped");
1369        Ok(())
1370    }
1371
1372    /// Graceful shutdown alias for tests
1373    pub async fn shutdown(&self) -> Result<()> {
1374        self.stop().await
1375    }
1376
1377    /// Check if the node is running
1378    pub fn is_running(&self) -> bool {
1379        self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
1380    }
1381
1382    /// Get the current listen addresses
1383    pub async fn listen_addrs(&self) -> Vec<MultiAddr> {
1384        self.transport.listen_addrs().await
1385    }
1386
1387    /// Get connected peers
1388    pub async fn connected_peers(&self) -> Vec<PeerId> {
1389        self.transport.connected_peers().await
1390    }
1391
1392    /// Get peer count
1393    pub async fn peer_count(&self) -> usize {
1394        self.transport.peer_count().await
1395    }
1396
1397    /// Get peer info
1398    pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1399        self.transport.peer_info(peer_id).await
1400    }
1401
1402    /// Get the channel ID for a given address, if connected (internal only).
1403    #[allow(dead_code)]
1404    pub(crate) async fn get_channel_id_by_address(&self, addr: &MultiAddr) -> Option<String> {
1405        self.transport.get_channel_id_by_address(addr).await
1406    }
1407
1408    /// List all active transport-level connections (internal only).
1409    #[allow(dead_code)]
1410    pub(crate) async fn list_active_connections(&self) -> Vec<(String, Vec<MultiAddr>)> {
1411        self.transport.list_active_connections().await
1412    }
1413
1414    /// Remove a channel from the peers map (internal only).
1415    #[allow(dead_code)]
1416    pub(crate) async fn remove_channel(&self, channel_id: &str) -> bool {
1417        self.transport.remove_channel(channel_id).await
1418    }
1419
1420    /// Close a channel's QUIC connection and remove it from all tracking maps.
1421    ///
1422    /// Use when a transport-level connection was established but identity
1423    /// exchange failed, so no [`PeerId`] is available for [`disconnect_peer`].
1424    pub(crate) async fn disconnect_channel(&self, channel_id: &str) {
1425        self.transport.disconnect_channel(channel_id).await;
1426    }
1427
1428    /// Check if an authenticated peer is connected (has at least one active channel).
1429    pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1430        self.transport.is_peer_connected(peer_id).await
1431    }
1432
1433    /// Connect to a peer, returning the transport-level channel ID.
1434    ///
1435    /// The returned channel ID is **not** the app-level [`PeerId`]. To obtain
1436    /// the authenticated peer identity, call
1437    /// [`wait_for_peer_identity`](Self::wait_for_peer_identity) with the
1438    /// returned channel ID.
1439    ///
1440    /// Callers that already know how the address was classified should
1441    /// prefer [`Self::connect_peer_typed`] so the resulting log line
1442    /// carries an accurate `kind` field instead of `unknown`.
1443    pub async fn connect_peer(&self, address: &MultiAddr) -> Result<String> {
1444        self.transport.connect_peer(address).await
1445    }
1446
1447    /// Connect to a peer at the given typed address.
1448    ///
1449    /// Same as [`Self::connect_peer`] but threads the [`AddressType`]
1450    /// through to the transport-level dial log so an operator can tell,
1451    /// after the fact, whether a failed dial was against a `Direct`,
1452    /// `Relay`, `Unverified`, or `NATted` address.
1453    pub async fn connect_peer_typed(
1454        &self,
1455        address: &MultiAddr,
1456        kind: AddressType,
1457    ) -> Result<String> {
1458        self.transport.connect_peer_typed(address, kind).await
1459    }
1460
1461    /// Wait for the identity exchange on `channel_id` to complete, returning
1462    /// the authenticated [`PeerId`].
1463    ///
1464    /// Use this after [`connect_peer`](Self::connect_peer) to bridge the gap
1465    /// between the transport-level channel ID and the app-level peer identity
1466    /// required by [`send_message`](Self::send_message).
1467    pub async fn wait_for_peer_identity(
1468        &self,
1469        channel_id: &str,
1470        timeout: Duration,
1471    ) -> Result<PeerId> {
1472        self.transport
1473            .wait_for_peer_identity(channel_id, timeout)
1474            .await
1475    }
1476
1477    /// Disconnect from a peer
1478    pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1479        self.transport.disconnect_peer(peer_id).await
1480    }
1481
1482    /// Check if a connection to a peer is active (internal only).
1483    #[allow(dead_code)]
1484    pub(crate) async fn is_connection_active(&self, channel_id: &str) -> bool {
1485        self.transport.is_connection_active(channel_id).await
1486    }
1487
1488    /// Send a message to an authenticated peer, reconnecting on demand.
1489    ///
1490    /// Tries the existing connection first. If the send fails (stale QUIC
1491    /// session, peer not found, etc.), resolves a dial address from:
1492    ///
1493    /// 1. Caller-provided `addrs` (highest priority)
1494    /// 2. Addresses cached in the transport layer (snapshotted before the
1495    ///    send attempt, since stale-channel cleanup removes them)
1496    /// 3. DHT routing table
1497    ///
1498    /// Then dials, waits for identity exchange, and retries the send exactly
1499    /// once on the fresh connection.  Concurrent reconnects to the same peer
1500    /// are serialised so only one dial is attempted at a time.
1501    pub async fn send_message(
1502        &self,
1503        peer_id: &PeerId,
1504        protocol: &str,
1505        data: Vec<u8>,
1506        addrs: &[MultiAddr],
1507    ) -> Result<()> {
1508        // Snapshot channel IDs before the send attempt — transport.send_message
1509        // prunes dead channels from bookkeeping but does NOT close the
1510        // underlying QUIC connection.  We need the original IDs for
1511        // disconnect_channel later.
1512        let existing_channels = self.transport.channels_for_peer(peer_id).await;
1513
1514        // No existing connection — serialise so concurrent sends to the same
1515        // unconnected peer don't each open their own QUIC connection.
1516        if existing_channels.is_empty() {
1517            let lock = self.reconnect_lock_for(peer_id);
1518            let _guard = lock.lock().await;
1519
1520            // Another sender may have connected while we waited for the lock.
1521            if self.transport.is_peer_connected(peer_id).await {
1522                return self.transport.send_message(peer_id, protocol, data).await;
1523            }
1524
1525            return self
1526                .reconnect_and_send(peer_id, protocol, data, addrs, &[], &[])
1527                .await;
1528        }
1529
1530        // Snapshot addresses before the send attempt — transport.send_message
1531        // prunes stale channels, which removes peer_info.
1532        let saved_addrs: Vec<MultiAddr> = self
1533            .transport
1534            .peer_info(peer_id)
1535            .await
1536            .map(|info| info.addresses)
1537            .unwrap_or_default();
1538
1539        // Clone data for retry — only stale-channel failures are retried, but
1540        // transport.send_message consumes the Vec.
1541        let retry_data = data.clone();
1542
1543        // Fast path: try existing connection.
1544        match self.transport.send_message(peer_id, protocol, data).await {
1545            Ok(()) => return Ok(()),
1546            Err(e) => {
1547                if !e.is_stale_channel_send_failure() {
1548                    debug!(
1549                        peer = %peer_id.to_hex(),
1550                        error = %e,
1551                        "send failed during active channel use, not reconnecting",
1552                    );
1553                    return Err(e);
1554                }
1555
1556                debug!(
1557                    peer = %peer_id.to_hex(),
1558                    error = %e,
1559                    "stale channel send failed, attempting reconnect",
1560                );
1561            }
1562        }
1563
1564        // Serialise reconnect attempts so concurrent sends to the same
1565        // stale peer don't race to dial.
1566        let lock = self.reconnect_lock_for(peer_id);
1567        let _guard = lock.lock().await;
1568
1569        // Another sender may have reconnected while we waited for the lock.
1570        if self.transport.is_peer_connected(peer_id).await {
1571            // Close stale QUIC connections that remove_channel (called inside
1572            // transport.send_message on failure) didn't tear down — it only
1573            // removes bookkeeping, not the underlying QUIC session.
1574            for channel_id in &existing_channels {
1575                self.transport.disconnect_channel(channel_id).await;
1576            }
1577            return self
1578                .transport
1579                .send_message(peer_id, protocol, retry_data)
1580                .await;
1581        }
1582
1583        self.reconnect_and_send(
1584            peer_id,
1585            protocol,
1586            retry_data,
1587            addrs,
1588            &saved_addrs,
1589            &existing_channels,
1590        )
1591        .await
1592    }
1593
1594    /// Tear down stale channels, reconnect to a peer, and send a message.
1595    async fn reconnect_and_send(
1596        &self,
1597        peer_id: &PeerId,
1598        protocol: &str,
1599        data: Vec<u8>,
1600        addrs: &[MultiAddr],
1601        saved_addrs: &[MultiAddr],
1602        stale_channels: &[String],
1603    ) -> Result<()> {
1604        // Resolve a dial address: caller-provided > saved > DHT.
1605        let (address, kind) = self
1606            .resolve_dial_address(peer_id, addrs, saved_addrs)
1607            .await
1608            .ok_or_else(|| {
1609                P2PError::Network(NetworkError::PeerNotFound(peer_id.to_hex().into()))
1610            })?;
1611
1612        // Tear down stale QUIC connections using their actual channel IDs.
1613        // transport.send_message only removes bookkeeping (peer_to_channel,
1614        // peers, active_connections) — it does NOT close the underlying QUIC
1615        // connection.  We must use the real channel IDs, not the resolved
1616        // dial address, because NAT / port migration can make them differ.
1617        if !stale_channels.is_empty() {
1618            for channel_id in stale_channels {
1619                self.transport.disconnect_channel(channel_id).await;
1620            }
1621            tokio::time::sleep(QUIC_TEARDOWN_GRACE).await;
1622        }
1623
1624        // Dial and wait for identity exchange.
1625        let channel_id = self.transport.connect_peer_typed(&address, kind).await?;
1626        let authenticated = match self
1627            .transport
1628            .wait_for_peer_identity(&channel_id, IDENTITY_EXCHANGE_TIMEOUT)
1629            .await
1630        {
1631            Ok(peer) => peer,
1632            Err(e) => {
1633                // Close the freshly-dialed QUIC connection so it doesn't
1634                // linger as a zombie until idle timeout.
1635                self.transport.disconnect_channel(&channel_id).await;
1636                return Err(e);
1637            }
1638        };
1639
1640        if &authenticated != peer_id {
1641            self.transport.disconnect_channel(&channel_id).await;
1642            return Err(P2PError::Identity(IdentityError::IdentityMismatch {
1643                expected: peer_id.to_hex().into(),
1644                actual: authenticated.to_hex().into(),
1645            }));
1646        }
1647
1648        // Send on the fresh connection.
1649        self.transport.send_message(peer_id, protocol, data).await
1650    }
1651
1652    /// Resolve a dial address for `peer_id`, preferring caller-provided
1653    /// addresses over cached/DHT sources.
1654    ///
1655    /// Returns the first dialable (QUIC, non-unspecified) address found,
1656    /// paired with the [`AddressType`] the DHT routing table believes
1657    /// for that address. Caller-provided / saved addresses that don't
1658    /// appear in the routing table fall back to
1659    /// [`AddressType::Unverified`] — the same default the routing table
1660    /// applies to legacy peers that never asserted reachability.
1661    /// Returns `None` when no dialable address is available.
1662    async fn resolve_dial_address(
1663        &self,
1664        peer_id: &PeerId,
1665        caller_addrs: &[MultiAddr],
1666        saved_addrs: &[MultiAddr],
1667    ) -> Option<(MultiAddr, AddressType)> {
1668        // Caller- and saved-supplied addresses skip the routing-table read.
1669        // The kind is only consumed as a log tag by `connect_peer_typed`, so
1670        // defaulting to Unverified — the same fallback the routing table
1671        // applies to legacy peers — saves an async lookup on the hot
1672        // reconnect path. Only consult the DHT when both upstream sources
1673        // are exhausted.
1674        if let Some(addr) = Self::first_dialable(caller_addrs) {
1675            return Some((addr, AddressType::Unverified));
1676        }
1677        if let Some(addr) = Self::first_dialable(saved_addrs) {
1678            return Some((addr, AddressType::Unverified));
1679        }
1680
1681        self.adaptive_dht
1682            .peer_addresses_for_dial_typed(peer_id)
1683            .await
1684            .into_iter()
1685            .find(|(a, _)| {
1686                a.dialable_socket_addr()
1687                    .is_some_and(|sa| !sa.ip().is_unspecified())
1688            })
1689    }
1690
1691    /// Return the first dialable QUIC address from a slice, skipping
1692    /// non-QUIC and unspecified (`0.0.0.0` / `::`) addresses.
1693    fn first_dialable(addrs: &[MultiAddr]) -> Option<MultiAddr> {
1694        addrs
1695            .iter()
1696            .find(|a| {
1697                let dialable = a
1698                    .dialable_socket_addr()
1699                    .is_some_and(|sa| !sa.ip().is_unspecified());
1700                if !dialable {
1701                    trace!(address = %a, "skipping non-dialable address");
1702                }
1703                dialable
1704            })
1705            .cloned()
1706    }
1707
1708    /// Get or create a per-peer reconnect lock.
1709    fn reconnect_lock_for(&self, peer_id: &PeerId) -> Arc<TokioMutex<()>> {
1710        self.reconnect_locks
1711            .lock()
1712            .entry(*peer_id)
1713            .or_insert_with(|| Arc::new(TokioMutex::new(())))
1714            .clone()
1715    }
1716}
1717
1718/// Parse a postcard-encoded protocol message into a `P2PEvent::Message`.
1719///
1720/// Returns `None` if the bytes cannot be deserialized as a valid `WireMessage`.
1721///
1722/// The `from` field is a required part of the wire protocol but is **not**
1723/// used as the event source. Instead, `source` — the transport-level peer ID
1724/// derived from the authenticated QUIC connection — is used so that consumers
1725/// can pass it directly to `send_message()`. This eliminates a spoofing
1726/// vector where a peer could claim an arbitrary identity via the payload.
1727///
1728/// Maximum allowed clock skew for message timestamps.
1729///
1730/// A decentralized network cannot assume participants have accurate clocks.
1731/// Consumer devices commonly have clocks that drift by minutes (no NTP, wrong
1732/// timezone offset applied to UTC, suspended laptops, VMs without guest
1733/// additions, etc.). Both past and future windows must be symmetric and
1734/// generous enough that normal clock drift does not partition the network.
1735///
1736/// 5 minutes in both directions provides replay protection while tolerating
1737/// the clock skew observed in real-world deployments (31-42 seconds was
1738/// measured between a macOS client and NTP-synced VPS nodes).
1739const MAX_MESSAGE_AGE_SECS: u64 = 300;
1740/// Maximum allowed future timestamp — symmetric with the past window.
1741const MAX_FUTURE_SECS: u64 = 300;
1742
1743/// Convenience constructor for `P2PError::Network(NetworkError::ProtocolError(...))`.
1744fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
1745    P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
1746}
1747
1748/// Helper to send an event via a broadcast sender, logging at trace level if no receivers.
1749pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
1750    if let Err(e) = tx.send(event) {
1751        tracing::trace!("Event broadcast has no receivers: {e}");
1752    }
1753}
1754
1755/// Result of parsing a protocol message, including optional authenticated identity.
1756pub(crate) struct ParsedMessage {
1757    /// The P2P event to broadcast.
1758    pub(crate) event: P2PEvent,
1759    /// If the message was signed and verified, the authenticated app-level [`PeerId`].
1760    pub(crate) authenticated_node_id: Option<PeerId>,
1761    /// The sender's user agent string from the wire message.
1762    pub(crate) user_agent: String,
1763}
1764
1765pub(crate) fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<ParsedMessage> {
1766    let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1767
1768    // Validate timestamp to prevent replay attacks
1769    let now = std::time::SystemTime::now()
1770        .duration_since(std::time::UNIX_EPOCH)
1771        .map(|d| d.as_secs())
1772        .unwrap_or(0);
1773
1774    // Reject messages that are too old (potential replay)
1775    if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
1776        tracing::warn!(
1777            "Rejecting stale message from {} (timestamp {} is {} seconds old)",
1778            source,
1779            message.timestamp,
1780            now.saturating_sub(message.timestamp)
1781        );
1782        return None;
1783    }
1784
1785    // Reject messages too far in the future (clock manipulation)
1786    if message.timestamp > now + MAX_FUTURE_SECS {
1787        tracing::warn!(
1788            "Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
1789            source,
1790            message.timestamp,
1791            message.timestamp.saturating_sub(now)
1792        );
1793        return None;
1794    }
1795
1796    // Verify app-level signature if present
1797    let authenticated_node_id = if !message.signature.is_empty() {
1798        match verify_message_signature(&message) {
1799            Ok(peer_id) => {
1800                debug!(
1801                    "Message from {} authenticated as app-level NodeId {}",
1802                    source, peer_id
1803                );
1804                Some(peer_id)
1805            }
1806            Err(e) => {
1807                warn!(
1808                    "Rejecting message from {}: signature verification failed: {}",
1809                    source, e
1810                );
1811                return None;
1812            }
1813        }
1814    } else {
1815        None
1816    };
1817
1818    debug!(
1819        "Parsed P2PEvent::Message - topic: {}, source: {:?} (transport: {}, logical: {}), payload_len: {}",
1820        message.protocol,
1821        authenticated_node_id,
1822        source,
1823        message.from,
1824        message.data.len()
1825    );
1826
1827    Some(ParsedMessage {
1828        event: P2PEvent::Message {
1829            topic: message.protocol,
1830            source: authenticated_node_id,
1831            data: message.data,
1832        },
1833        authenticated_node_id,
1834        user_agent: message.user_agent,
1835    })
1836}
1837
1838/// Verify the ML-DSA-65 signature on a WireMessage and return the authenticated [`PeerId`].
1839///
1840/// Besides verifying the cryptographic signature, this also checks that the
1841/// self-asserted `from` field matches the [`PeerId`] derived from the public
1842/// key. This prevents a sender from signing with their real key while
1843/// claiming a different identity in the `from` field.
1844fn verify_message_signature(message: &WireMessage) -> std::result::Result<PeerId, String> {
1845    let pubkey = MlDsaPublicKey::from_bytes(&message.public_key)
1846        .map_err(|e| format!("invalid public key: {e:?}"))?;
1847
1848    let peer_id = peer_id_from_public_key(&pubkey);
1849
1850    // Validate that the self-asserted `from` field matches the public key.
1851    if message.from != peer_id {
1852        return Err(format!(
1853            "from field mismatch: message claims '{}' but public key derives '{}'",
1854            message.from, peer_id
1855        ));
1856    }
1857
1858    let signable = postcard::to_stdvec(&(
1859        &message.protocol,
1860        &message.data as &[u8],
1861        &message.from,
1862        message.timestamp,
1863        &message.user_agent,
1864    ))
1865    .map_err(|e| format!("failed to serialize signable bytes: {e}"))?;
1866
1867    let sig = MlDsaSignature::from_bytes(&message.signature)
1868        .map_err(|e| format!("invalid signature: {e:?}"))?;
1869
1870    let valid = crate::quantum_crypto::ml_dsa_verify(&pubkey, &signable, &sig)
1871        .map_err(|e| format!("verification error: {e}"))?;
1872
1873    if valid {
1874        Ok(peer_id)
1875    } else {
1876        Err("signature is invalid".to_string())
1877    }
1878}
1879
1880impl P2PNode {
1881    /// Subscribe to network events
1882    pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1883        self.transport.subscribe_events()
1884    }
1885
1886    /// Backwards-compat event stream accessor for tests
1887    pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1888        self.subscribe_events()
1889    }
1890
1891    /// Get node uptime
1892    pub fn uptime(&self) -> Duration {
1893        self.start_time.elapsed()
1894    }
1895
1896    // MCP removed: all MCP tool/service methods removed
1897
1898    // /// Handle MCP remote tool call with network integration
1899
1900    // /// List tools available on a specific remote peer
1901
1902    // /// Get MCP server statistics
1903
1904    // Background tasks (connection_lifecycle_monitor, keepalive, periodic_maintenance)
1905    // are now implemented in TransportHandle.
1906
1907    /// Check system health
1908    pub async fn health_check(&self) -> Result<()> {
1909        let peer_count = self.peer_count().await;
1910        if peer_count > self.config.max_connections {
1911            Err(protocol_error(format!(
1912                "Too many connections: {peer_count}"
1913            )))
1914        } else {
1915            Ok(())
1916        }
1917    }
1918
1919    /// Get the attached DHT manager.
1920    pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
1921        self.adaptive_dht.dht_manager()
1922    }
1923
1924    /// Backwards-compatible alias for `dht_manager()`.
1925    pub fn dht(&self) -> &Arc<DhtNetworkManager> {
1926        self.dht_manager()
1927    }
1928
1929    /// Add a discovered peer to the bootstrap cache
1930    pub async fn add_discovered_peer(
1931        &self,
1932        _peer_id: PeerId,
1933        addresses: Vec<MultiAddr>,
1934    ) -> Result<()> {
1935        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1936            let manager = bootstrap_manager.read().await;
1937            let socket_addresses: Vec<std::net::SocketAddr> = addresses
1938                .iter()
1939                .filter_map(|addr| addr.socket_addr())
1940                .collect();
1941            if let Some(&primary) = socket_addresses.first() {
1942                manager
1943                    .add_peer(&primary, socket_addresses)
1944                    .await
1945                    .map_err(|e| {
1946                        protocol_error(format!("Failed to add peer to bootstrap cache: {e}"))
1947                    })?;
1948            }
1949        }
1950        Ok(())
1951    }
1952
1953    /// Update connection metrics for a peer in the bootstrap cache
1954    pub async fn update_peer_metrics(
1955        &self,
1956        addr: &MultiAddr,
1957        success: bool,
1958        latency_ms: Option<u64>,
1959        _error: Option<String>,
1960    ) -> Result<()> {
1961        if let Some(ref bootstrap_manager) = self.bootstrap_manager
1962            && let Some(sa) = addr.socket_addr()
1963        {
1964            let manager = bootstrap_manager.read().await;
1965            if success {
1966                let rtt_ms = latency_ms.unwrap_or(0) as u32;
1967                manager.record_success(&sa, rtt_ms).await;
1968            } else {
1969                manager.record_failure(&sa).await;
1970            }
1971        }
1972        Ok(())
1973    }
1974
1975    /// Get bootstrap cache statistics
1976    pub async fn get_bootstrap_cache_stats(
1977        &self,
1978    ) -> Result<Option<crate::bootstrap::BootstrapStats>> {
1979        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1980            let manager = bootstrap_manager.read().await;
1981            Ok(Some(manager.stats().await))
1982        } else {
1983            Ok(None)
1984        }
1985    }
1986
1987    /// Get the number of cached bootstrap peers
1988    pub async fn cached_peer_count(&self) -> usize {
1989        if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1990            && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1991        {
1992            return stats.total_peers;
1993        }
1994        0
1995    }
1996
1997    /// Connect to bootstrap peers and perform initial peer discovery.
1998    ///
1999    /// If a `close_group_cache` was loaded on startup, its peers are injected
2000    /// as the highest-priority addresses (before configured and cached bootstrap
2001    /// peers). Their trust scores were already imported into the `TrustEngine`
2002    /// before this method is called.
2003    async fn connect_bootstrap_peers(
2004        &self,
2005        close_group_cache: Option<&CloseGroupCache>,
2006    ) -> Result<()> {
2007        // Each entry is a list of addresses for a single peer. Close-group
2008        // peers are dialed serially to preserve trust-priority ordering; CLI
2009        // and cached bootstrap peers are dialed concurrently to cut cold-start
2010        // latency when some peers are slow or dead.
2011        let mut serial_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
2012        let mut parallel_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
2013        let mut used_cache = false;
2014        let mut seen_addresses = std::collections::HashSet::new();
2015
2016        // Priority 0: Cached close group peers (pre-trusted, highest priority).
2017        // These peers had trust scores loaded into the TrustEngine earlier in start(),
2018        // so they are already known-good when added to the routing table.
2019        // Sorted by trust score (highest first), then XOR distance (closest first)
2020        // as tiebreaker so we reconnect to the most trusted, closest peers first.
2021        if let Some(cache) = close_group_cache {
2022            let mut sorted_peers: Vec<&CachedCloseGroupPeer> = cache.peers.iter().collect();
2023            sorted_peers.sort_by(|a, b| {
2024                // NaN-safe comparison: push NaN scores to the back instead
2025                // of treating them as equal (which would silently promote
2026                // corrupted entries to the front of the reconnection queue).
2027                let score_ord = match b.trust.score.partial_cmp(&a.trust.score) {
2028                    Some(ord) => ord,
2029                    None => {
2030                        if a.trust.score.is_nan() {
2031                            std::cmp::Ordering::Greater // a is NaN, push to back
2032                        } else {
2033                            std::cmp::Ordering::Less // b is NaN, push b to back
2034                        }
2035                    }
2036                };
2037                score_ord.then_with(|| {
2038                    let da = self.peer_id.xor_distance(&a.peer_id);
2039                    let db = self.peer_id.xor_distance(&b.peer_id);
2040                    da.cmp(&db)
2041                })
2042            });
2043
2044            let mut added_from_close_group = 0usize;
2045            for peer in &sorted_peers {
2046                let new_addresses: Vec<MultiAddr> = peer
2047                    .addresses
2048                    .iter()
2049                    .filter(|a| {
2050                        a.dialable_socket_addr()
2051                            .is_some_and(|sa| !seen_addresses.contains(&sa))
2052                    })
2053                    .cloned()
2054                    .collect();
2055
2056                if !new_addresses.is_empty() {
2057                    for addr in &new_addresses {
2058                        if let Some(sa) = addr.socket_addr() {
2059                            seen_addresses.insert(sa);
2060                        }
2061                    }
2062                    serial_addr_sets.push(new_addresses);
2063                    added_from_close_group += 1;
2064                }
2065            }
2066            if added_from_close_group > 0 {
2067                info!(
2068                    "Added {} close group cache peers (highest trust first)",
2069                    added_from_close_group
2070                );
2071            }
2072        }
2073
2074        // Priority 1: Configured bootstrap peers.
2075        if !self.config.bootstrap_peers.is_empty() {
2076            info!(
2077                "Using {} configured bootstrap peers (priority)",
2078                self.config.bootstrap_peers.len()
2079            );
2080            for multiaddr in &self.config.bootstrap_peers {
2081                let Some(socket_addr) = multiaddr.dialable_socket_addr() else {
2082                    warn!("Skipping non-QUIC bootstrap peer: {}", multiaddr);
2083                    continue;
2084                };
2085                seen_addresses.insert(socket_addr);
2086                parallel_addr_sets.push(vec![multiaddr.clone()]);
2087            }
2088        }
2089
2090        // Supplement with cached bootstrap peers (after CLI peers)
2091        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2092            let manager = bootstrap_manager.read().await;
2093            let cached_peers = manager.select_peers(BOOTSTRAP_PEER_BATCH_SIZE).await;
2094            if !cached_peers.is_empty() {
2095                let mut added_from_cache = 0;
2096                for cached in cached_peers {
2097                    let mut addrs = vec![cached.primary_address];
2098                    addrs.extend(cached.addresses);
2099                    // Only add addresses we haven't seen from CLI peers
2100                    let new_addresses: Vec<MultiAddr> = addrs
2101                        .into_iter()
2102                        .filter(|a| !seen_addresses.contains(a))
2103                        .map(MultiAddr::quic)
2104                        .collect();
2105
2106                    if !new_addresses.is_empty() {
2107                        for addr in &new_addresses {
2108                            if let Some(sa) = addr.socket_addr() {
2109                                seen_addresses.insert(sa);
2110                            }
2111                        }
2112                        parallel_addr_sets.push(new_addresses);
2113                        added_from_cache += 1;
2114                    }
2115                }
2116                if added_from_cache > 0 {
2117                    info!(
2118                        "Added {} cached bootstrap peers (supplementing CLI peers)",
2119                        added_from_cache
2120                    );
2121                    used_cache = true;
2122                }
2123            }
2124        }
2125
2126        if serial_addr_sets.is_empty() && parallel_addr_sets.is_empty() {
2127            info!("No bootstrap peers configured and no cached peers available");
2128            return Ok(());
2129        }
2130
2131        // Connect to bootstrap peers, wait for identity exchange, then
2132        // perform DHT peer discovery using the real cryptographic PeerIds.
2133        let identity_timeout = Duration::from_secs(BOOTSTRAP_IDENTITY_TIMEOUT_SECS);
2134        let mut successful_connections = 0;
2135        let mut connected_peer_ids: Vec<PeerId> = Vec::new();
2136
2137        // Phase A: serial close-group dials to preserve trust-priority ordering.
2138        let client_mode = matches!(self.config.mode, NodeMode::Client);
2139        for addrs in &serial_addr_sets {
2140            if let Some(peer_id) = self
2141                .dial_bootstrap_addr_set(addrs, used_cache, identity_timeout)
2142                .await
2143            {
2144                successful_connections += 1;
2145                connected_peer_ids.push(peer_id);
2146                if client_mode && successful_connections >= CLIENT_BOOTSTRAP_TARGET {
2147                    debug!(
2148                        "Client bootstrap target reached ({successful_connections} peers) — skipping remaining serial dials"
2149                    );
2150                    break;
2151                }
2152            }
2153        }
2154
2155        // Phase B: concurrent dials of CLI + cached bootstrap peers, bounded
2156        // by `MAX_CONCURRENT_BOOTSTRAP_DIALS` to cap simultaneous QUIC+PQC
2157        // handshakes. Skipped entirely when a client has already hit its
2158        // target during Phase A.
2159        if !client_mode || successful_connections < CLIENT_BOOTSTRAP_TARGET {
2160            let mut parallel_stream =
2161                futures::stream::iter(parallel_addr_sets.into_iter().map(|addrs| async move {
2162                    self.dial_bootstrap_addr_set(&addrs, used_cache, identity_timeout)
2163                        .await
2164                }))
2165                .buffer_unordered(MAX_CONCURRENT_BOOTSTRAP_DIALS);
2166            while let Some(result) = parallel_stream.next().await {
2167                if let Some(peer_id) = result {
2168                    successful_connections += 1;
2169                    connected_peer_ids.push(peer_id);
2170                    if client_mode && successful_connections >= CLIENT_BOOTSTRAP_TARGET {
2171                        debug!(
2172                            "Client bootstrap target reached ({successful_connections} peers) — cancelling pending dials"
2173                        );
2174                        break;
2175                    }
2176                }
2177            }
2178            // `parallel_stream` is dropped here when the `if` block exits,
2179            // cancelling any in-flight futures inside `buffer_unordered`
2180            // before we proceed to the DHT discovery phase below.
2181        }
2182
2183        if successful_connections == 0 {
2184            // Outbound connections failed — but for nodes behind symmetric NAT,
2185            // the bootstrap peer may have already connected INBOUND to us.
2186            // Wait briefly and check if we have any transport-level connections.
2187            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2188            let transport_peers = self.transport.connected_peers().await;
2189            if !transport_peers.is_empty() {
2190                info!(
2191                    "No outbound bootstrap succeeded, but {} inbound peer(s) connected — proceeding with DHT bootstrap",
2192                    transport_peers.len()
2193                );
2194                connected_peer_ids = transport_peers;
2195                successful_connections = connected_peer_ids.len();
2196            } else {
2197                if !used_cache {
2198                    warn!("Failed to connect to any bootstrap peers");
2199                }
2200                // Starting a node should not be gated on immediate bootstrap connectivity.
2201                // Keep running and allow background discovery / retries to populate peers later.
2202                return Ok(());
2203            }
2204        }
2205
2206        info!(
2207            "Successfully connected to {} bootstrap peers",
2208            successful_connections
2209        );
2210
2211        // Perform DHT peer discovery from connected bootstrap peers.
2212        match self
2213            .dht_manager()
2214            .bootstrap_from_peers(&connected_peer_ids)
2215            .await
2216        {
2217            Ok(count) => info!("DHT peer discovery found {} peers", count),
2218            Err(e) => warn!("DHT peer discovery failed: {}", e),
2219        }
2220
2221        // Perform two consecutive self-lookups to fully refresh the close
2222        // neighborhood. The second lookup may discover peers that joined or
2223        // became reachable during the first lookup (Section 11.2 step 5).
2224        //
2225        // Client-mode nodes don't serve the DHT, so they don't need an
2226        // accurate close neighborhood — they just need enough peers to route
2227        // lookups for their own requests, which `bootstrap_from_peers` above
2228        // already provides. Skipping the self-lookups here cuts cold-start
2229        // latency by tens of seconds when α-sized batches include dead peers.
2230        if matches!(self.config.mode, NodeMode::Node) {
2231            const SELF_LOOKUP_ROUNDS: u8 = 2;
2232            for i in 1..=SELF_LOOKUP_ROUNDS {
2233                if let Err(e) = self.dht_manager().trigger_self_lookup().await {
2234                    warn!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} failed: {e}");
2235                } else {
2236                    debug!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} completed");
2237                }
2238            }
2239        } else {
2240            debug!("Skipping post-bootstrap self-lookups (client mode)");
2241        }
2242
2243        // Mark node as bootstrapped - we have connected to bootstrap peers
2244        // and initiated peer discovery
2245        self.is_bootstrapped.store(true, Ordering::SeqCst);
2246        info!(
2247            "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
2248            successful_connections,
2249            connected_peer_ids.len()
2250        );
2251
2252        // Save close group cache after initial bootstrap so a crash before
2253        // graceful shutdown still preserves the newly-discovered close group.
2254        if let Some(ref dir) = self.config.close_group_cache_dir
2255            && let Err(e) = self.save_close_group_cache(dir).await
2256        {
2257            warn!("Failed to save close group cache after bootstrap: {e}");
2258        }
2259
2260        Ok(())
2261    }
2262
2263    /// Dial a single bootstrap peer's address set, stopping at the first
2264    /// address that completes the identity handshake. Records success/failure
2265    /// in the bootstrap cache and returns the remote peer's cryptographic
2266    /// PeerId on success. Safe to call concurrently for different peers.
2267    async fn dial_bootstrap_addr_set(
2268        &self,
2269        addrs: &[MultiAddr],
2270        used_cache: bool,
2271        identity_timeout: Duration,
2272    ) -> Option<PeerId> {
2273        for addr in addrs {
2274            // Bootstrap addresses come from operator-supplied seeds (CLI
2275            // flags, config file, bootstrap cache). The local reachability
2276            // classifier hasn't proven them yet, so log them as
2277            // `Unverified` rather than `unknown`.
2278            match self
2279                .transport
2280                .connect_peer_typed(addr, AddressType::Unverified)
2281                .await
2282            {
2283                Ok(channel_id) => match self
2284                    .transport
2285                    .wait_for_peer_identity(&channel_id, identity_timeout)
2286                    .await
2287                {
2288                    Ok(real_peer_id) => {
2289                        if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2290                            let manager = bootstrap_manager.read().await;
2291                            if let Some(sa) = addr.socket_addr() {
2292                                manager.record_success(&sa, 100).await;
2293                            }
2294                        }
2295                        return Some(real_peer_id);
2296                    }
2297                    Err(e) => {
2298                        warn!(
2299                            "Timeout waiting for identity from bootstrap peer {}: {}, \
2300                             closing channel {}",
2301                            addr, e, channel_id
2302                        );
2303                        self.disconnect_channel(&channel_id).await;
2304                    }
2305                },
2306                Err(e) => {
2307                    warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2308                    if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2309                        let manager = bootstrap_manager.read().await;
2310                        if let Some(sa) = addr.socket_addr() {
2311                            manager.record_failure(&sa).await;
2312                        }
2313                    }
2314                }
2315            }
2316        }
2317        None
2318    }
2319
2320    /// Persist the current close group peers and their trust scores to disk.
2321    async fn save_close_group_cache(&self, dir: &Path) -> anyhow::Result<()> {
2322        let key: crate::dht::Key = *self.peer_id.as_bytes();
2323        let k_value = self.config.dht_config.k_value;
2324        let close_group = self
2325            .dht_manager()
2326            .find_closest_nodes_local(&key, k_value)
2327            .await;
2328
2329        if close_group.is_empty() {
2330            debug!("No close group peers to save");
2331            return Ok(());
2332        }
2333
2334        let trust_engine = self.adaptive_dht.trust_engine();
2335        let now_epoch = SystemTime::now()
2336            .duration_since(UNIX_EPOCH)
2337            .map(|d| d.as_secs())
2338            .unwrap_or(0);
2339
2340        let peers: Vec<CachedCloseGroupPeer> = close_group
2341            .into_iter()
2342            .filter_map(|dht_node| {
2343                let score = trust_engine.score(&dht_node.peer_id);
2344                // Guard against NaN/Infinity — serde_json cannot round-trip
2345                // non-finite f64 values, which would corrupt the cache file.
2346                if !score.is_finite() {
2347                    return None;
2348                }
2349                Some(CachedCloseGroupPeer {
2350                    peer_id: dht_node.peer_id,
2351                    addresses: dht_node.addresses,
2352                    trust: TrustRecord {
2353                        score,
2354                        last_updated_epoch_secs: now_epoch,
2355                    },
2356                })
2357            })
2358            .collect();
2359
2360        let peer_count = peers.len();
2361        let cache = CloseGroupCache {
2362            peers,
2363            saved_at_epoch_secs: now_epoch,
2364        };
2365
2366        cache.save_to_dir(dir).await?;
2367        info!(
2368            "Saved {} close group peers to cache in {}",
2369            peer_count,
2370            dir.display()
2371        );
2372        Ok(())
2373    }
2374
2375    // disconnect_all_peers and periodic_tasks are now in TransportHandle
2376}
2377
2378/// Network sender trait for sending messages
2379#[async_trait::async_trait]
2380#[allow(dead_code)]
2381pub trait NetworkSender: Send + Sync {
2382    /// Send a message to an authenticated peer.
2383    async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2384
2385    /// Get our local peer ID (cryptographic identity).
2386    fn local_peer_id(&self) -> PeerId;
2387}
2388
2389// P2PNetworkSender removed — NetworkSender is now implemented directly on TransportHandle.
2390// NodeBuilder removed — use NodeConfigBuilder + P2PNode::new() instead.
2391
2392#[cfg(test)]
2393#[allow(clippy::unwrap_used, clippy::expect_used)]
2394mod diversity_tests {
2395    use super::*;
2396    use crate::security::IPDiversityConfig;
2397
2398    async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2399        // Use a temp dir to avoid conflicts with cached files from old format
2400        let temp_dir = tempfile::TempDir::new().expect("temp dir");
2401        let mut bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
2402        bootstrap_config.cache_dir = temp_dir.path().to_path_buf();
2403
2404        BootstrapManager::with_node_config(bootstrap_config, config)
2405            .await
2406            .expect("bootstrap manager")
2407    }
2408
2409    #[tokio::test]
2410    async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2411        let config = NodeConfig {
2412            diversity_config: Some(IPDiversityConfig::testnet()),
2413            ..Default::default()
2414        };
2415
2416        let manager = build_bootstrap_manager_like_prod(&config).await;
2417        // Verify testnet config has permissive IP limits
2418        assert_eq!(manager.diversity_config().max_per_ip, Some(usize::MAX));
2419        assert_eq!(manager.diversity_config().max_per_subnet, Some(usize::MAX));
2420    }
2421}
2422
2423/// Helper function to register a new channel.
2424///
2425/// Sync because the underlying map is a sharded `DashMap` — no `.await` is
2426/// needed to take a write lock. Keeping this sync is what lets the inbound
2427/// accept loop in `TransportHandle` insert without yielding, so it cannot
2428/// stall and back-pressure the upstream handshake channel.
2429pub(crate) fn register_new_channel(
2430    peers: &DashMap<String, PeerInfo>,
2431    channel_id: &str,
2432    remote_addr: &MultiAddr,
2433) {
2434    let peer_info = PeerInfo {
2435        channel_id: channel_id.to_owned(),
2436        addresses: vec![remote_addr.clone()],
2437        connected_at: tokio::time::Instant::now(),
2438        last_seen: tokio::time::Instant::now(),
2439        status: ConnectionStatus::Connected,
2440        protocols: vec!["p2p-core/1.0.0".to_string()],
2441        heartbeat_count: 0,
2442    };
2443    peers.insert(channel_id.to_owned(), peer_info);
2444}
2445
2446#[cfg(test)]
2447mod tests {
2448    use super::*;
2449    // MCP removed from tests
2450    use std::time::Duration;
2451    use tokio::time::timeout;
2452
2453    /// 2 MiB — used in builder tests to verify max_message_size configuration.
2454    const TEST_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
2455
2456    // Test tool handler for network tests
2457
2458    // MCP removed
2459
2460    /// Helper function to create a test node configuration
2461    fn create_test_node_config() -> NodeConfig {
2462        NodeConfig {
2463            local: true,
2464            port: 0,
2465            ipv6: true,
2466            bootstrap_peers: vec![],
2467            connection_timeout: Duration::from_secs(2),
2468            max_connections: 100,
2469            dht_config: DHTConfig::default(),
2470            bootstrap_cache_config: None,
2471            diversity_config: None,
2472            max_message_size: None,
2473            node_identity: None,
2474            mode: NodeMode::default(),
2475            custom_user_agent: None,
2476            allow_loopback: true,
2477            adaptive_dht_config: AdaptiveDhtConfig::default(),
2478            close_group_cache_dir: None,
2479        }
2480    }
2481
2482    /// Helper function to create a test tool
2483    // MCP removed: test tool helper deleted
2484
2485    #[tokio::test]
2486    async fn test_node_config_default() {
2487        let config = NodeConfig::default();
2488
2489        assert_eq!(config.listen_addrs().len(), 2); // IPv4 + IPv6
2490        assert_eq!(config.max_connections, 10000);
2491        assert_eq!(config.connection_timeout, Duration::from_secs(25));
2492    }
2493
2494    #[tokio::test]
2495    async fn test_dht_config_default() {
2496        let config = DHTConfig::default();
2497
2498        assert_eq!(config.k_value, 20);
2499        assert_eq!(config.alpha_value, 3);
2500        assert_eq!(config.refresh_interval, Duration::from_secs(600));
2501    }
2502
2503    #[test]
2504    fn test_connection_status_variants() {
2505        let connecting = ConnectionStatus::Connecting;
2506        let connected = ConnectionStatus::Connected;
2507        let disconnecting = ConnectionStatus::Disconnecting;
2508        let disconnected = ConnectionStatus::Disconnected;
2509        let failed = ConnectionStatus::Failed("test error".to_string());
2510
2511        assert_eq!(connecting, ConnectionStatus::Connecting);
2512        assert_eq!(connected, ConnectionStatus::Connected);
2513        assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2514        assert_eq!(disconnected, ConnectionStatus::Disconnected);
2515        assert_ne!(connecting, connected);
2516
2517        if let ConnectionStatus::Failed(msg) = failed {
2518            assert_eq!(msg, "test error");
2519        } else {
2520            panic!("Expected Failed status");
2521        }
2522    }
2523
2524    #[tokio::test]
2525    async fn test_node_creation() -> Result<()> {
2526        let config = create_test_node_config();
2527        let node = P2PNode::new(config).await?;
2528
2529        // PeerId is derived from the cryptographic identity (32-byte BLAKE3 hash)
2530        assert_eq!(node.peer_id().to_hex().len(), 64);
2531        assert!(!node.is_running());
2532        assert_eq!(node.peer_count().await, 0);
2533        assert!(node.connected_peers().await.is_empty());
2534
2535        Ok(())
2536    }
2537
2538    #[tokio::test]
2539    async fn test_node_lifecycle() -> Result<()> {
2540        let config = create_test_node_config();
2541        let node = P2PNode::new(config).await?;
2542
2543        // Initially not running
2544        assert!(!node.is_running());
2545
2546        // Start the node
2547        node.start().await?;
2548        assert!(node.is_running());
2549
2550        // Check listen addresses were set (at least one)
2551        let listen_addrs = node.listen_addrs().await;
2552        assert!(
2553            !listen_addrs.is_empty(),
2554            "Expected at least one listening address"
2555        );
2556
2557        // Stop the node
2558        node.stop().await?;
2559        assert!(!node.is_running());
2560
2561        Ok(())
2562    }
2563
2564    #[tokio::test]
2565    async fn test_peer_connection() -> Result<()> {
2566        let config1 = create_test_node_config();
2567        let config2 = create_test_node_config();
2568
2569        let node1 = P2PNode::new(config1).await?;
2570        let node2 = P2PNode::new(config2).await?;
2571
2572        node1.start().await?;
2573        node2.start().await?;
2574
2575        let node2_addr = node2
2576            .listen_addrs()
2577            .await
2578            .into_iter()
2579            .find(|a| a.is_ipv4())
2580            .ok_or_else(|| {
2581                P2PError::Network(crate::error::NetworkError::InvalidAddress(
2582                    "Node 2 did not expose an IPv4 listen address".into(),
2583                ))
2584            })?;
2585
2586        // Connect to a real peer (unsigned — no node_identity configured).
2587        // connect_peer returns a transport-level channel ID (String), not a PeerId.
2588        let channel_id = node1.connect_peer(&node2_addr).await?;
2589
2590        // Unauthenticated connections don't appear in the app-level peer maps.
2591        // Verify transport-level tracking via is_connection_active / peers map.
2592        assert!(node1.is_connection_active(&channel_id).await);
2593
2594        // Get peer info from the transport-level peers map (keyed by channel ID)
2595        let peer_info = node1.transport.peer_info_by_channel(&channel_id).await;
2596        assert!(peer_info.is_some());
2597        let info = peer_info.expect("Peer info should exist after connect");
2598        assert_eq!(info.channel_id, channel_id);
2599        assert_eq!(info.status, ConnectionStatus::Connected);
2600        assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2601
2602        // Disconnect the channel
2603        node1.remove_channel(&channel_id).await;
2604        assert!(!node1.is_connection_active(&channel_id).await);
2605
2606        node1.stop().await?;
2607        node2.stop().await?;
2608
2609        Ok(())
2610    }
2611
2612    #[tokio::test]
2613    async fn test_connect_peer_rejects_tcp_multiaddr() -> Result<()> {
2614        let config = create_test_node_config();
2615        let node = P2PNode::new(config).await?;
2616
2617        let tcp_addr: MultiAddr = "/ip4/127.0.0.1/tcp/1".parse().unwrap();
2618        let result = node.connect_peer(&tcp_addr).await;
2619
2620        assert!(
2621            matches!(
2622                result,
2623                Err(P2PError::Network(
2624                    crate::error::NetworkError::InvalidAddress(_)
2625                ))
2626            ),
2627            "TCP multiaddrs should be rejected before a QUIC dial is attempted, got: {:?}",
2628            result
2629        );
2630
2631        Ok(())
2632    }
2633
2634    // TODO(windows): Investigate QUIC connection issues on Windows CI
2635    // This test consistently fails on Windows GitHub Actions runners with
2636    // "All connect attempts failed" even with IPv4-only config, long delays,
2637    // and multiple retry attempts. The underlying saorsa-transport library may have
2638    // issues on Windows that need investigation.
2639    // See: https://github.com/dirvine/saorsa-core/issues/TBD
2640    #[cfg_attr(target_os = "windows", ignore)]
2641    #[tokio::test]
2642    async fn test_event_subscription() -> Result<()> {
2643        // PeerConnected/PeerDisconnected only fire for authenticated peers
2644        // (nodes with node_identity that send signed messages).
2645        // Configure both nodes with identities so the event subscription test works.
2646        let identity1 =
2647            Arc::new(NodeIdentity::generate().expect("should generate identity for test node1"));
2648        let identity2 =
2649            Arc::new(NodeIdentity::generate().expect("should generate identity for test node2"));
2650
2651        let mut config1 = create_test_node_config();
2652        config1.ipv6 = false;
2653        config1.node_identity = Some(identity1);
2654
2655        let node2_peer_id = *identity2.peer_id();
2656        let mut config2 = create_test_node_config();
2657        config2.ipv6 = false;
2658        config2.node_identity = Some(identity2);
2659
2660        let node1 = P2PNode::new(config1).await?;
2661        let node2 = P2PNode::new(config2).await?;
2662
2663        node1.start().await?;
2664        node2.start().await?;
2665
2666        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
2667
2668        // Subscribe to node2's events (node2 will receive the signed message)
2669        let mut events = node2.subscribe_events();
2670
2671        let node2_addr = node2.local_addr().ok_or_else(|| {
2672            P2PError::Network(crate::error::NetworkError::ProtocolError(
2673                "No listening address".to_string().into(),
2674            ))
2675        })?;
2676
2677        // Connect node1 → node2
2678        let mut channel_id = None;
2679        for attempt in 0..3 {
2680            if attempt > 0 {
2681                tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2682            }
2683            match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
2684                Ok(Ok(id)) => {
2685                    channel_id = Some(id);
2686                    break;
2687                }
2688                Ok(Err(_)) | Err(_) => continue,
2689            }
2690        }
2691        let channel_id = channel_id.expect("Failed to connect after 3 attempts");
2692
2693        // Wait for identity exchange to complete via wait_for_peer_identity.
2694        let target_peer_id = node1
2695            .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2696            .await?;
2697        assert_eq!(target_peer_id, node2_peer_id);
2698
2699        // node1 sends a signed message → node2 authenticates → PeerConnected fires on node2
2700        node1
2701            .send_message(&target_peer_id, "test-topic", b"hello".to_vec(), &[])
2702            .await?;
2703
2704        // Check for PeerConnected event on node2
2705        let event = timeout(Duration::from_secs(2), async {
2706            loop {
2707                match events.recv().await {
2708                    Ok(P2PEvent::PeerConnected(id, _)) => return Ok(id),
2709                    Ok(P2PEvent::Message { .. }) => continue, // skip messages
2710                    Ok(_) => continue,
2711                    Err(e) => return Err(e),
2712                }
2713            }
2714        })
2715        .await;
2716        assert!(event.is_ok(), "Should receive PeerConnected event");
2717        let connected_peer_id = event.expect("Timed out").expect("Channel error");
2718        // The connected peer ID should be node1's app-level ID (a valid PeerId)
2719        assert!(
2720            connected_peer_id.0.iter().any(|&b| b != 0),
2721            "PeerConnected should carry a non-zero peer ID"
2722        );
2723
2724        node1.stop().await?;
2725        node2.stop().await?;
2726
2727        Ok(())
2728    }
2729
2730    // TODO(windows): Same QUIC connection issues as test_event_subscription
2731    #[cfg_attr(target_os = "windows", ignore)]
2732    #[tokio::test]
2733    async fn test_message_sending() -> Result<()> {
2734        // Create two nodes (IPv4-only loopback)
2735        let mut config1 = create_test_node_config();
2736        config1.ipv6 = false;
2737        let node1 = P2PNode::new(config1).await?;
2738        node1.start().await?;
2739
2740        let mut config2 = create_test_node_config();
2741        config2.ipv6 = false;
2742        let node2 = P2PNode::new(config2).await?;
2743        node2.start().await?;
2744
2745        // Wait a bit for nodes to start listening
2746        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2747
2748        // Get actual listening address of node2
2749        let node2_addr = node2.local_addr().ok_or_else(|| {
2750            P2PError::Network(crate::error::NetworkError::ProtocolError(
2751                "No listening address".to_string().into(),
2752            ))
2753        })?;
2754
2755        // Connect node1 to node2
2756        let channel_id =
2757            match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2758                Ok(res) => res?,
2759                Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2760            };
2761
2762        // Wait for identity exchange via wait_for_peer_identity.
2763        let target_peer_id = node1
2764            .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2765            .await?;
2766        assert_eq!(target_peer_id, node2.peer_id().clone());
2767
2768        // Send a message
2769        let message_data = b"Hello, peer!".to_vec();
2770        let result = match timeout(
2771            Duration::from_millis(500),
2772            node1.send_message(&target_peer_id, "test-protocol", message_data, &[]),
2773        )
2774        .await
2775        {
2776            Ok(res) => res,
2777            Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2778        };
2779        // For now, we'll just check that we don't get a "not connected" error
2780        // The actual send might fail due to no handler on the other side
2781        if let Err(e) = &result {
2782            assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2783        }
2784
2785        // Try to send to non-existent peer
2786        let non_existent_peer = PeerId::from_bytes([0xFFu8; 32]);
2787        let result = node1
2788            .send_message(&non_existent_peer, "test-protocol", vec![], &[])
2789            .await;
2790        assert!(result.is_err(), "Sending to non-existent peer should fail");
2791
2792        node1.stop().await?;
2793        node2.stop().await?;
2794
2795        Ok(())
2796    }
2797
2798    #[tokio::test]
2799    async fn test_remote_mcp_operations() -> Result<()> {
2800        let config = create_test_node_config();
2801        let node = P2PNode::new(config).await?;
2802
2803        // MCP removed; test reduced to simple start/stop
2804        node.start().await?;
2805        node.stop().await?;
2806        Ok(())
2807    }
2808
2809    #[tokio::test]
2810    async fn test_health_check() -> Result<()> {
2811        let config = create_test_node_config();
2812        let node = P2PNode::new(config).await?;
2813
2814        // Health check should pass with no connections
2815        let result = node.health_check().await;
2816        assert!(result.is_ok());
2817
2818        // Note: We're not actually connecting to real peers here
2819        // since that would require running bootstrap nodes.
2820        // The health check should still pass with no connections.
2821
2822        Ok(())
2823    }
2824
2825    #[tokio::test]
2826    async fn test_node_uptime() -> Result<()> {
2827        let config = create_test_node_config();
2828        let node = P2PNode::new(config).await?;
2829
2830        let uptime1 = node.uptime();
2831        assert!(uptime1 >= Duration::from_secs(0));
2832
2833        // Wait a bit
2834        tokio::time::sleep(Duration::from_millis(10)).await;
2835
2836        let uptime2 = node.uptime();
2837        assert!(uptime2 > uptime1);
2838
2839        Ok(())
2840    }
2841
2842    #[tokio::test]
2843    async fn test_node_config_access() -> Result<()> {
2844        let config = create_test_node_config();
2845        let node = P2PNode::new(config).await?;
2846
2847        let node_config = node.config();
2848        assert_eq!(node_config.max_connections, 100);
2849        // MCP removed
2850
2851        Ok(())
2852    }
2853
2854    #[tokio::test]
2855    async fn test_mcp_server_access() -> Result<()> {
2856        let config = create_test_node_config();
2857        let _node = P2PNode::new(config).await?;
2858
2859        // MCP removed
2860        Ok(())
2861    }
2862
2863    #[tokio::test]
2864    async fn test_dht_access() -> Result<()> {
2865        let config = create_test_node_config();
2866        let node = P2PNode::new(config).await?;
2867
2868        // DHT is always available
2869        let _dht = node.dht();
2870
2871        Ok(())
2872    }
2873
2874    #[tokio::test]
2875    async fn test_node_config_builder() -> Result<()> {
2876        let bootstrap: MultiAddr = "/ip4/127.0.0.1/udp/9000/quic".parse().unwrap();
2877
2878        let config = NodeConfig::builder()
2879            .local(true)
2880            .ipv6(true)
2881            .bootstrap_peer(bootstrap)
2882            .connection_timeout(Duration::from_secs(15))
2883            .max_connections(200)
2884            .max_message_size(TEST_MAX_MESSAGE_SIZE)
2885            .build()?;
2886
2887        assert_eq!(config.listen_addrs().len(), 2); // IPv4 + IPv6
2888        assert!(config.local);
2889        assert!(config.ipv6);
2890        assert_eq!(config.bootstrap_peers.len(), 1);
2891        assert_eq!(config.connection_timeout, Duration::from_secs(15));
2892        assert_eq!(config.max_connections, 200);
2893        assert_eq!(config.max_message_size, Some(TEST_MAX_MESSAGE_SIZE));
2894        assert!(config.allow_loopback); // auto-enabled by local(true)
2895
2896        Ok(())
2897    }
2898
2899    #[tokio::test]
2900    async fn test_bootstrap_peers() -> Result<()> {
2901        let mut config = create_test_node_config();
2902        config.bootstrap_peers = vec![
2903            crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9200),
2904            crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9201),
2905        ];
2906
2907        let node = P2PNode::new(config).await?;
2908
2909        // Start node (which attempts to connect to bootstrap peers)
2910        node.start().await?;
2911
2912        // In a test environment, bootstrap peers may not be available
2913        // The test verifies the node starts correctly with bootstrap configuration
2914        // Peer count may include local/internal tracking, so we just verify it's reasonable
2915        let _peer_count = node.peer_count().await;
2916
2917        node.stop().await?;
2918        Ok(())
2919    }
2920
2921    #[tokio::test]
2922    async fn test_peer_info_structure() {
2923        let peer_info = PeerInfo {
2924            channel_id: "test_peer".to_string(),
2925            addresses: vec!["/ip4/127.0.0.1/tcp/9000".parse::<MultiAddr>().unwrap()],
2926            connected_at: Instant::now(),
2927            last_seen: Instant::now(),
2928            status: ConnectionStatus::Connected,
2929            protocols: vec!["test-protocol".to_string()],
2930            heartbeat_count: 0,
2931        };
2932
2933        assert_eq!(peer_info.channel_id, "test_peer");
2934        assert_eq!(peer_info.addresses.len(), 1);
2935        assert_eq!(peer_info.status, ConnectionStatus::Connected);
2936        assert_eq!(peer_info.protocols.len(), 1);
2937    }
2938
2939    #[tokio::test]
2940    async fn test_serialization() -> Result<()> {
2941        // Test that configs can be serialized/deserialized
2942        let config = create_test_node_config();
2943        let serialized = serde_json::to_string(&config)?;
2944        let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2945
2946        assert_eq!(config.local, deserialized.local);
2947        assert_eq!(config.port, deserialized.port);
2948        assert_eq!(config.ipv6, deserialized.ipv6);
2949        assert_eq!(config.bootstrap_peers, deserialized.bootstrap_peers);
2950
2951        Ok(())
2952    }
2953
2954    #[tokio::test]
2955    async fn test_get_channel_id_by_address_found() -> Result<()> {
2956        let config = create_test_node_config();
2957        let node = P2PNode::new(config).await?;
2958
2959        // Manually insert a peer for testing
2960        let test_channel_id = "peer_test_123".to_string();
2961        let test_address = "192.168.1.100:9000";
2962        let test_multiaddr = MultiAddr::quic(test_address.parse().unwrap());
2963
2964        let peer_info = PeerInfo {
2965            channel_id: test_channel_id.clone(),
2966            addresses: vec![test_multiaddr],
2967            connected_at: Instant::now(),
2968            last_seen: Instant::now(),
2969            status: ConnectionStatus::Connected,
2970            protocols: vec!["test-protocol".to_string()],
2971            heartbeat_count: 0,
2972        };
2973
2974        node.transport
2975            .inject_peer(test_channel_id.clone(), peer_info)
2976            .await;
2977
2978        // Test: Find channel by address
2979        let lookup_addr = MultiAddr::quic(test_address.parse().unwrap());
2980        let found_channel_id = node.get_channel_id_by_address(&lookup_addr).await;
2981        assert_eq!(found_channel_id, Some(test_channel_id));
2982
2983        Ok(())
2984    }
2985
2986    #[tokio::test]
2987    async fn test_get_channel_id_by_address_not_found() -> Result<()> {
2988        let config = create_test_node_config();
2989        let node = P2PNode::new(config).await?;
2990
2991        // Test: Try to find a channel that doesn't exist
2992        let unknown_addr = MultiAddr::quic("192.168.1.200:9000".parse().unwrap());
2993        let result = node.get_channel_id_by_address(&unknown_addr).await;
2994        assert_eq!(result, None);
2995
2996        Ok(())
2997    }
2998
2999    #[tokio::test]
3000    async fn test_get_channel_id_by_address_invalid_format() -> Result<()> {
3001        let config = create_test_node_config();
3002        let node = P2PNode::new(config).await?;
3003
3004        // Test: Non-IP address should return None (no matching socket addr)
3005        let ble_addr = MultiAddr::new(crate::address::TransportAddr::Ble {
3006            mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
3007            psm: 0x0025,
3008        });
3009        let result = node.get_channel_id_by_address(&ble_addr).await;
3010        assert_eq!(result, None);
3011
3012        Ok(())
3013    }
3014
3015    #[tokio::test]
3016    async fn test_get_channel_id_by_address_multiple_peers() -> Result<()> {
3017        let config = create_test_node_config();
3018        let node = P2PNode::new(config).await?;
3019
3020        // Add multiple peers with different addresses
3021        let peer1_id = "peer_1".to_string();
3022        let peer1_addr_str = "192.168.1.101:9001";
3023        let peer1_multiaddr = MultiAddr::quic(peer1_addr_str.parse().unwrap());
3024
3025        let peer2_id = "peer_2".to_string();
3026        let peer2_addr_str = "192.168.1.102:9002";
3027        let peer2_multiaddr = MultiAddr::quic(peer2_addr_str.parse().unwrap());
3028
3029        let peer1_info = PeerInfo {
3030            channel_id: peer1_id.clone(),
3031            addresses: vec![peer1_multiaddr],
3032            connected_at: Instant::now(),
3033            last_seen: Instant::now(),
3034            status: ConnectionStatus::Connected,
3035            protocols: vec!["test-protocol".to_string()],
3036            heartbeat_count: 0,
3037        };
3038
3039        let peer2_info = PeerInfo {
3040            channel_id: peer2_id.clone(),
3041            addresses: vec![peer2_multiaddr],
3042            connected_at: Instant::now(),
3043            last_seen: Instant::now(),
3044            status: ConnectionStatus::Connected,
3045            protocols: vec!["test-protocol".to_string()],
3046            heartbeat_count: 0,
3047        };
3048
3049        node.transport
3050            .inject_peer(peer1_id.clone(), peer1_info)
3051            .await;
3052        node.transport
3053            .inject_peer(peer2_id.clone(), peer2_info)
3054            .await;
3055
3056        // Test: Find each channel by their unique address
3057        let found_peer1 = node
3058            .get_channel_id_by_address(&MultiAddr::quic(peer1_addr_str.parse().unwrap()))
3059            .await;
3060        let found_peer2 = node
3061            .get_channel_id_by_address(&MultiAddr::quic(peer2_addr_str.parse().unwrap()))
3062            .await;
3063
3064        assert_eq!(found_peer1, Some(peer1_id));
3065        assert_eq!(found_peer2, Some(peer2_id));
3066
3067        Ok(())
3068    }
3069
3070    #[tokio::test]
3071    async fn test_list_active_connections_empty() -> Result<()> {
3072        let config = create_test_node_config();
3073        let node = P2PNode::new(config).await?;
3074
3075        // Test: No connections initially
3076        let connections = node.list_active_connections().await;
3077        assert!(connections.is_empty());
3078
3079        Ok(())
3080    }
3081
3082    #[tokio::test]
3083    async fn test_list_active_connections_with_peers() -> Result<()> {
3084        let config = create_test_node_config();
3085        let node = P2PNode::new(config).await?;
3086
3087        // Add multiple peers
3088        let peer1_id = "peer_1".to_string();
3089        let peer1_addrs = vec![
3090            MultiAddr::quic("192.168.1.101:9001".parse().unwrap()),
3091            MultiAddr::quic("192.168.1.101:9002".parse().unwrap()),
3092        ];
3093
3094        let peer2_id = "peer_2".to_string();
3095        let peer2_addrs = vec![MultiAddr::quic("192.168.1.102:9003".parse().unwrap())];
3096
3097        let peer1_info = PeerInfo {
3098            channel_id: peer1_id.clone(),
3099            addresses: peer1_addrs.clone(),
3100            connected_at: Instant::now(),
3101            last_seen: Instant::now(),
3102            status: ConnectionStatus::Connected,
3103            protocols: vec!["test-protocol".to_string()],
3104            heartbeat_count: 0,
3105        };
3106
3107        let peer2_info = PeerInfo {
3108            channel_id: peer2_id.clone(),
3109            addresses: peer2_addrs.clone(),
3110            connected_at: Instant::now(),
3111            last_seen: Instant::now(),
3112            status: ConnectionStatus::Connected,
3113            protocols: vec!["test-protocol".to_string()],
3114            heartbeat_count: 0,
3115        };
3116
3117        node.transport
3118            .inject_peer(peer1_id.clone(), peer1_info)
3119            .await;
3120        node.transport
3121            .inject_peer(peer2_id.clone(), peer2_info)
3122            .await;
3123
3124        // Also add to active_connections (list_active_connections iterates over this)
3125        node.transport
3126            .inject_active_connection(peer1_id.clone())
3127            .await;
3128        node.transport
3129            .inject_active_connection(peer2_id.clone())
3130            .await;
3131
3132        // Test: List all active connections
3133        let connections = node.list_active_connections().await;
3134        assert_eq!(connections.len(), 2);
3135
3136        // Verify peer1 and peer2 are in the list
3137        let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3138        let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3139
3140        assert!(peer1_conn.is_some());
3141        assert!(peer2_conn.is_some());
3142
3143        // Verify addresses match
3144        assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3145        assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3146
3147        Ok(())
3148    }
3149
3150    #[tokio::test]
3151    async fn test_remove_channel_success() -> Result<()> {
3152        let config = create_test_node_config();
3153        let node = P2PNode::new(config).await?;
3154
3155        // Add a peer
3156        let channel_id = "peer_to_remove".to_string();
3157        let channel_peer_id = PeerId::from_name(&channel_id);
3158        let peer_info = PeerInfo {
3159            channel_id: channel_id.clone(),
3160            addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
3161            connected_at: Instant::now(),
3162            last_seen: Instant::now(),
3163            status: ConnectionStatus::Connected,
3164            protocols: vec!["test-protocol".to_string()],
3165            heartbeat_count: 0,
3166        };
3167
3168        node.transport
3169            .inject_peer(channel_id.clone(), peer_info)
3170            .await;
3171        node.transport
3172            .inject_peer_to_channel(channel_peer_id, channel_id.clone())
3173            .await;
3174
3175        // Verify peer exists
3176        assert!(node.is_peer_connected(&channel_peer_id).await);
3177
3178        // Remove the channel
3179        let removed = node.remove_channel(&channel_id).await;
3180        assert!(removed);
3181
3182        // Verify peer no longer exists
3183        assert!(!node.is_peer_connected(&channel_peer_id).await);
3184
3185        Ok(())
3186    }
3187
3188    #[tokio::test]
3189    async fn test_remove_channel_nonexistent() -> Result<()> {
3190        let config = create_test_node_config();
3191        let node = P2PNode::new(config).await?;
3192
3193        // Try to remove a channel that doesn't exist
3194        let removed = node.remove_channel("nonexistent_peer").await;
3195        assert!(!removed);
3196
3197        Ok(())
3198    }
3199
3200    #[tokio::test]
3201    async fn test_is_peer_connected() -> Result<()> {
3202        let config = create_test_node_config();
3203        let node = P2PNode::new(config).await?;
3204
3205        let channel_id = "test_peer".to_string();
3206        let channel_peer_id = PeerId::from_name(&channel_id);
3207
3208        // Initially not connected
3209        assert!(!node.is_peer_connected(&channel_peer_id).await);
3210
3211        // Add peer
3212        let peer_info = PeerInfo {
3213            channel_id: channel_id.clone(),
3214            addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
3215            connected_at: Instant::now(),
3216            last_seen: Instant::now(),
3217            status: ConnectionStatus::Connected,
3218            protocols: vec!["test-protocol".to_string()],
3219            heartbeat_count: 0,
3220        };
3221
3222        node.transport
3223            .inject_peer(channel_id.clone(), peer_info)
3224            .await;
3225        node.transport
3226            .inject_peer_to_channel(channel_peer_id, channel_id.clone())
3227            .await;
3228
3229        // Now connected
3230        assert!(node.is_peer_connected(&channel_peer_id).await);
3231
3232        // Remove channel
3233        node.remove_channel(&channel_id).await;
3234
3235        // No longer connected
3236        assert!(!node.is_peer_connected(&channel_peer_id).await);
3237
3238        Ok(())
3239    }
3240
3241    #[test]
3242    fn test_normalize_ipv6_wildcard() {
3243        use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3244
3245        let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3246        let normalized = normalize_wildcard_to_loopback(wildcard);
3247
3248        assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3249        assert_eq!(normalized.port(), 8080);
3250    }
3251
3252    #[test]
3253    fn test_normalize_ipv4_wildcard() {
3254        use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3255
3256        let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3257        let normalized = normalize_wildcard_to_loopback(wildcard);
3258
3259        assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3260        assert_eq!(normalized.port(), 9000);
3261    }
3262
3263    #[test]
3264    fn test_normalize_specific_address_unchanged() {
3265        let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3266        let normalized = normalize_wildcard_to_loopback(specific);
3267
3268        assert_eq!(normalized, specific);
3269    }
3270
3271    #[test]
3272    fn test_normalize_loopback_unchanged() {
3273        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3274
3275        let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3276        let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3277        assert_eq!(normalized_v6, loopback_v6);
3278
3279        let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3280        let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3281        assert_eq!(normalized_v4, loopback_v4);
3282    }
3283
3284    // ---- parse_protocol_message regression tests ----
3285
3286    /// Get current Unix timestamp for tests
3287    fn current_timestamp() -> u64 {
3288        std::time::SystemTime::now()
3289            .duration_since(std::time::UNIX_EPOCH)
3290            .map(|d| d.as_secs())
3291            .unwrap_or(0)
3292    }
3293
3294    /// Helper to create a postcard-serialized WireMessage for tests
3295    fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
3296        let msg = WireMessage {
3297            protocol: protocol.to_string(),
3298            data,
3299            from: PeerId::from_name(from),
3300            timestamp,
3301            user_agent: String::new(),
3302            public_key: Vec::new(),
3303            signature: Vec::new(),
3304        };
3305        postcard::to_stdvec(&msg).unwrap()
3306    }
3307
3308    #[test]
3309    fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
3310        // Regression: For unsigned messages, P2PEvent::Message.source must be the
3311        // transport peer ID, NOT the "from" field from the wire message.
3312        let transport_id = "abcdef0123456789";
3313        let logical_id = "spoofed-logical-id";
3314        let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
3315
3316        let parsed =
3317            parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
3318
3319        // Unsigned message: no authenticated node ID
3320        assert!(parsed.authenticated_node_id.is_none());
3321
3322        match parsed.event {
3323            P2PEvent::Message {
3324                topic,
3325                source,
3326                data,
3327            } => {
3328                assert!(source.is_none(), "unsigned message source must be None");
3329                assert_eq!(topic, "test/v1");
3330                assert_eq!(data, vec![1u8, 2, 3]);
3331            }
3332            other => panic!("expected P2PEvent::Message, got {:?}", other),
3333        }
3334    }
3335
3336    #[test]
3337    fn test_parse_protocol_message_rejects_invalid_bytes() {
3338        // Random bytes that are not valid bincode should be rejected
3339        assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
3340    }
3341
3342    #[test]
3343    fn test_parse_protocol_message_rejects_truncated_message() {
3344        // A truncated bincode message should fail to deserialize
3345        let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
3346        let truncated = &full_bytes[..full_bytes.len() / 2];
3347        assert!(parse_protocol_message(truncated, "peer-id").is_none());
3348    }
3349
3350    #[test]
3351    fn test_parse_protocol_message_empty_payload() {
3352        let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
3353
3354        let parsed = parse_protocol_message(&bytes, "transport-peer")
3355            .expect("valid message with empty data should parse");
3356
3357        match parsed.event {
3358            P2PEvent::Message { data, .. } => assert!(data.is_empty()),
3359            other => panic!("expected P2PEvent::Message, got {:?}", other),
3360        }
3361    }
3362
3363    #[test]
3364    fn test_parse_protocol_message_preserves_binary_payload() {
3365        // Verify that arbitrary byte values (including 0xFF, 0x00) survive round-trip
3366        let payload: Vec<u8> = (0..=255).collect();
3367        let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
3368
3369        let parsed = parse_protocol_message(&bytes, "peer-id")
3370            .expect("valid message with full byte range should parse");
3371
3372        match parsed.event {
3373            P2PEvent::Message { data, topic, .. } => {
3374                assert_eq!(topic, "binary/v1");
3375                assert_eq!(
3376                    data, payload,
3377                    "payload must survive bincode round-trip exactly"
3378                );
3379            }
3380            other => panic!("expected P2PEvent::Message, got {:?}", other),
3381        }
3382    }
3383
3384    #[test]
3385    fn test_parse_signed_message_verifies_and_uses_node_id() {
3386        let identity = NodeIdentity::generate().expect("should generate identity");
3387        let protocol = "test/signed";
3388        let data: Vec<u8> = vec![10, 20, 30];
3389        // The `from` field must match the PeerId derived from the public key.
3390        let from = *identity.peer_id();
3391        let timestamp = current_timestamp();
3392        let user_agent = "test/1.0";
3393
3394        // Compute signable bytes the same way create_protocol_message does
3395        let signable =
3396            postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3397                .unwrap();
3398        let sig = identity.sign(&signable).expect("signing should succeed");
3399
3400        let msg = WireMessage {
3401            protocol: protocol.to_string(),
3402            data: data.clone(),
3403            from,
3404            timestamp,
3405            user_agent: user_agent.to_string(),
3406            public_key: identity.public_key().as_bytes().to_vec(),
3407            signature: sig.as_bytes().to_vec(),
3408        };
3409        let bytes = postcard::to_stdvec(&msg).unwrap();
3410
3411        let parsed =
3412            parse_protocol_message(&bytes, "transport-xyz").expect("signed message should parse");
3413
3414        let expected_peer_id = *identity.peer_id();
3415        assert_eq!(
3416            parsed.authenticated_node_id.as_ref(),
3417            Some(&expected_peer_id)
3418        );
3419
3420        match parsed.event {
3421            P2PEvent::Message { source, .. } => {
3422                assert_eq!(
3423                    source.as_ref(),
3424                    Some(&expected_peer_id),
3425                    "source should be the verified PeerId"
3426                );
3427            }
3428            other => panic!("expected P2PEvent::Message, got {:?}", other),
3429        }
3430    }
3431
3432    #[test]
3433    fn test_parse_message_with_bad_signature_is_rejected() {
3434        let identity = NodeIdentity::generate().expect("should generate identity");
3435        let protocol = "test/bad-sig";
3436        let data: Vec<u8> = vec![1, 2, 3];
3437        let from = *identity.peer_id();
3438        let timestamp = current_timestamp();
3439        let user_agent = "test/1.0";
3440
3441        // Sign correct signable bytes
3442        let signable =
3443            postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3444                .unwrap();
3445        let sig = identity.sign(&signable).expect("signing should succeed");
3446
3447        // Tamper with the data (signature was over [1,2,3], not [99,99,99])
3448        let msg = WireMessage {
3449            protocol: protocol.to_string(),
3450            data: vec![99, 99, 99],
3451            from,
3452            timestamp,
3453            user_agent: user_agent.to_string(),
3454            public_key: identity.public_key().as_bytes().to_vec(),
3455            signature: sig.as_bytes().to_vec(),
3456        };
3457        let bytes = postcard::to_stdvec(&msg).unwrap();
3458
3459        assert!(
3460            parse_protocol_message(&bytes, "transport-xyz").is_none(),
3461            "message with bad signature should be rejected"
3462        );
3463    }
3464
3465    #[test]
3466    fn test_parse_message_with_mismatched_from_is_rejected() {
3467        let identity = NodeIdentity::generate().expect("should generate identity");
3468        let protocol = "test/from-mismatch";
3469        let data: Vec<u8> = vec![1, 2, 3];
3470        // Use a `from` field that does NOT match the public key's PeerId.
3471        let fake_from = PeerId::from_bytes([0xDE; 32]);
3472        let timestamp = current_timestamp();
3473        let user_agent = "test/1.0";
3474
3475        let signable =
3476            postcard::to_stdvec(&(protocol, data.as_slice(), &fake_from, timestamp, user_agent))
3477                .unwrap();
3478        let sig = identity.sign(&signable).expect("signing should succeed");
3479
3480        let msg = WireMessage {
3481            protocol: protocol.to_string(),
3482            data,
3483            from: fake_from,
3484            timestamp,
3485            user_agent: user_agent.to_string(),
3486            public_key: identity.public_key().as_bytes().to_vec(),
3487            signature: sig.as_bytes().to_vec(),
3488        };
3489        let bytes = postcard::to_stdvec(&msg).unwrap();
3490
3491        assert!(
3492            parse_protocol_message(&bytes, "transport-xyz").is_none(),
3493            "message with mismatched from field should be rejected"
3494        );
3495    }
3496}