Skip to main content

ant_quic/
node.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! Zero-configuration P2P node
9//!
10//! This module provides [`Node`] - the simple API for creating P2P nodes
11//! that work out of the box with zero configuration. Every node automatically:
12//!
13//! - Uses 100% post-quantum cryptography (ML-KEM-768)
14//! - Works behind any NAT via native QUIC hole punching
15//! - Offers relay/bootstrap/coordinator capability hints by default
16//! - Exposes a practical status snapshot via [`NodeStatus`]
17//!
18//! # Zero Configuration
19//!
20//! ```rust,ignore
21//! use ant_quic::Node;
22//!
23//! #[tokio::main]
24//! async fn main() -> anyhow::Result<()> {
25//!     // Create a node - that's it!
26//!     let node = Node::new().await?;
27//!
28//!     println!("I am: {:?}", node.peer_id());
29//!     println!("Listening on: {:?}", node.local_addr());
30//!
31//!     // Check status
32//!     let status = node.status().await;
33//!     println!("NAT behavior hint: {}", status.nat_type);
34//!     println!("Can receive direct: {}", status.can_receive_direct);
35//!     println!("Acting as relay: {}", status.is_relaying);
36//!
37//!     // Connect to a peer
38//!     let conn = node.connect_addr("quic.saorsalabs.com:9000".parse()?).await?;
39//!
40//!     // Accept connections
41//!     let incoming = node.accept().await;
42//!
43//!     Ok(())
44//! }
45//! ```
46
47use std::net::SocketAddr;
48use std::sync::Arc;
49use std::time::{Duration, Instant};
50
51use crate::bootstrap_cache::PeerCapabilities;
52use crate::crypto::pqc::types::{MlDsaPublicKey, MlDsaSecretKey};
53use tokio::sync::broadcast;
54use tracing::info;
55
56use crate::host_identity::HostIdentity;
57use crate::nat_traversal_api::PeerId;
58use crate::node_config::NodeConfig;
59use crate::node_event::NodeEvent;
60use crate::node_status::{NatType, NodeStatus};
61use crate::p2p_endpoint::{
62    ConnectionHealth, EndpointError, P2pEndpoint, P2pEvent, PeerConnection, PeerLifecycleEvent,
63};
64use crate::reachability::{DIRECT_REACHABILITY_TTL, socket_addr_scope};
65use crate::unified_config::P2pConfig;
66use crate::unified_config::load_or_generate_endpoint_keypair;
67
68/// Error type for Node operations
69#[derive(Debug, thiserror::Error)]
70pub enum NodeError {
71    /// Failed to create node
72    #[error("Failed to create node: {0}")]
73    Creation(String),
74
75    /// Connection error
76    #[error("Connection error: {0}")]
77    Connection(String),
78
79    /// Endpoint error
80    #[error("Endpoint error: {0}")]
81    Endpoint(#[from] EndpointError),
82
83    /// Shutting down
84    #[error("Node is shutting down")]
85    ShuttingDown,
86}
87
88/// Zero-configuration P2P node
89///
90/// This is the primary API for ant-quic. Create a node with zero configuration
91/// and it will automatically handle NAT traversal, post-quantum cryptography,
92/// and peer discovery.
93///
94/// # Symmetric P2P
95///
96/// All nodes are equal - every node can:
97/// - Connect to other nodes
98/// - Accept incoming connections
99/// - Act as coordinator for NAT traversal
100/// - Act as relay for peers behind restrictive NATs
101///
102/// # Post-Quantum Security
103///
104/// v0.2: Every connection uses pure post-quantum cryptography:
105/// - Key Exchange: ML-KEM-768 (FIPS 203)
106/// - Authentication: ML-DSA-65 (FIPS 204)
107/// - Ed25519 is used ONLY for the 32-byte PeerId compact identifier
108///
109/// There is no classical crypto fallback - security is quantum-resistant by default.
110///
111/// # Example
112///
113/// ```rust,ignore
114/// use ant_quic::Node;
115///
116/// // Zero configuration
117/// let node = Node::new().await?;
118///
119/// // Or with known peers
120/// let node = Node::with_peers(vec!["quic.saorsalabs.com:9000".parse()?]).await?;
121///
122/// // Or with persistent identity
123/// let keypair = load_keypair()?;
124/// let node = Node::with_keypair(keypair).await?;
125/// ```
126pub struct Node {
127    /// Inner P2pEndpoint
128    inner: Arc<P2pEndpoint>,
129
130    /// Start time for uptime calculation
131    start_time: Instant,
132
133    /// Event broadcaster for unified events
134    event_tx: broadcast::Sender<NodeEvent>,
135}
136
137impl std::fmt::Debug for Node {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct("Node")
140            .field("peer_id", &self.peer_id())
141            .field("local_addr", &self.local_addr())
142            .finish_non_exhaustive()
143    }
144}
145
146impl Node {
147    // === Creation ===
148
149    /// Create a node with automatic configuration
150    ///
151    /// This is the recommended way to create a node. It will:
152    /// - Bind to a random port on all interfaces (0.0.0.0:0)
153    /// - Generate a fresh Ed25519 keypair
154    /// - Enable all NAT traversal capabilities
155    /// - Use 100% post-quantum cryptography
156    ///
157    /// # Example
158    ///
159    /// ```rust,ignore
160    /// let node = Node::new().await?;
161    /// ```
162    pub async fn new() -> Result<Self, NodeError> {
163        Self::with_config(NodeConfig::default()).await
164    }
165
166    /// Create a node with a specific bind address
167    ///
168    /// Use this when you need a specific port for firewall rules or port forwarding.
169    ///
170    /// # Example
171    ///
172    /// ```rust,ignore
173    /// let node = Node::bind("0.0.0.0:9000".parse()?).await?;
174    /// ```
175    pub async fn bind(addr: SocketAddr) -> Result<Self, NodeError> {
176        Self::with_config(NodeConfig::with_bind_addr(addr)).await
177    }
178
179    /// Create a node with known peers
180    ///
181    /// Use this when you have a list of known peers to connect to initially.
182    /// These can be any nodes in the network - they'll help with NAT traversal.
183    ///
184    /// # Example
185    ///
186    /// ```rust,ignore
187    /// let node = Node::with_peers(vec![
188    ///     "quic.saorsalabs.com:9000".parse()?,
189    ///     "peer2.example.com:9000".parse()?,
190    /// ]).await?;
191    /// ```
192    pub async fn with_peers(peers: Vec<SocketAddr>) -> Result<Self, NodeError> {
193        Self::with_config(NodeConfig::with_known_peers(peers)).await
194    }
195
196    /// Create a node with an existing keypair
197    ///
198    /// Use this for persistent identity across restarts. The peer ID
199    /// is derived from the public key, so using the same keypair
200    /// gives you the same peer ID.
201    ///
202    /// # Example
203    ///
204    /// ```rust,ignore
205    /// let (public_key, secret_key) = load_keypair_from_file("~/.ant-quic/identity.key")?;
206    /// let node = Node::with_keypair(public_key, secret_key).await?;
207    /// ```
208    pub async fn with_keypair(
209        public_key: MlDsaPublicKey,
210        secret_key: MlDsaSecretKey,
211    ) -> Result<Self, NodeError> {
212        Self::with_config(NodeConfig::with_keypair(public_key, secret_key)).await
213    }
214
215    /// Create a node with a HostIdentity for persistent encrypted identity
216    ///
217    /// This is the recommended way to create a node with persistent identity.
218    /// The keypair is encrypted at rest using a key derived from the HostIdentity.
219    ///
220    /// # Arguments
221    ///
222    /// * `host` - The HostIdentity for key derivation
223    /// * `network_id` - Network identifier for per-network keypair isolation
224    /// * `storage_dir` - Directory to store the encrypted keypair
225    ///
226    /// # Example
227    ///
228    /// ```rust,ignore
229    /// use ant_quic::{Node, HostIdentity};
230    ///
231    /// let host = HostIdentity::generate();
232    /// let node = Node::with_host_identity(
233    ///     &host,
234    ///     b"my-network",
235    ///     "/var/lib/ant-quic",
236    /// ).await?;
237    /// ```
238    pub async fn with_host_identity(
239        host: &HostIdentity,
240        network_id: &[u8],
241        storage_dir: impl AsRef<std::path::Path>,
242    ) -> Result<Self, NodeError> {
243        let (public_key, secret_key) =
244            load_or_generate_endpoint_keypair(host, network_id, storage_dir.as_ref()).map_err(
245                |e| NodeError::Creation(format!("Failed to load/generate keypair: {e}")),
246            )?;
247
248        Self::with_keypair(public_key, secret_key).await
249    }
250
251    /// Create a node with full configuration
252    ///
253    /// For power users who need specific settings. Most applications
254    /// should use `Node::new()` or one of the convenience methods.
255    ///
256    /// # Example
257    ///
258    /// ```rust,ignore
259    /// let config = NodeConfig::builder()
260    ///     .bind_addr("0.0.0.0:9000".parse()?)
261    ///     .known_peer("quic.saorsalabs.com:9000".parse()?)
262    ///     .keypair(load_keypair()?)
263    ///     .build();
264    ///
265    /// let node = Node::with_config(config).await?;
266    /// ```
267    pub async fn with_config(config: NodeConfig) -> Result<Self, NodeError> {
268        // Convert NodeConfig to P2pConfig
269        let mut p2p_config = P2pConfig::default();
270
271        // Build transport registry first (before any partial moves)
272        p2p_config.transport_registry = config.build_transport_registry();
273
274        if let Some(bind_addr) = config.bind_addr {
275            p2p_config.bind_addr = Some(bind_addr.into());
276        }
277
278        p2p_config.known_peers = config.known_peers.into_iter().map(Into::into).collect();
279        p2p_config.keypair = config.keypair;
280
281        if let Some(capacity) = config.data_channel_capacity {
282            p2p_config.data_channel_capacity = capacity;
283        }
284        if let Some(streams) = config.max_concurrent_uni_streams {
285            p2p_config.max_concurrent_uni_streams = streams;
286        }
287
288        // Create event channel
289        let (event_tx, _) = broadcast::channel(256);
290
291        // Create P2pEndpoint
292        let endpoint = P2pEndpoint::new(p2p_config)
293            .await
294            .map_err(NodeError::Endpoint)?;
295
296        info!("Node created with peer ID: {:?}", endpoint.peer_id());
297
298        let inner = Arc::new(endpoint);
299
300        // Spawn event bridge task to forward P2pEvent -> NodeEvent
301        Self::spawn_event_bridge(Arc::clone(&inner), event_tx.clone());
302
303        Ok(Self {
304            inner,
305            start_time: Instant::now(),
306            event_tx,
307        })
308    }
309
310    /// Spawn a background task to bridge P2pEvents to NodeEvents
311    fn spawn_event_bridge(endpoint: Arc<P2pEndpoint>, event_tx: broadcast::Sender<NodeEvent>) {
312        let mut p2p_events = endpoint.subscribe();
313
314        tokio::spawn(async move {
315            loop {
316                match p2p_events.recv().await {
317                    Ok(p2p_event) => {
318                        if let Some(node_event) = Self::convert_event(p2p_event) {
319                            // Ignore send errors - means no subscribers
320                            let _ = event_tx.send(node_event);
321                        }
322                    }
323                    Err(broadcast::error::RecvError::Closed) => {
324                        // Channel closed, endpoint shutting down
325                        break;
326                    }
327                    Err(broadcast::error::RecvError::Lagged(n)) => {
328                        // Subscriber lagged behind, log and continue
329                        tracing::warn!("Event bridge lagged by {} events", n);
330                    }
331                }
332            }
333        });
334    }
335
336    /// Convert a P2pEvent to a NodeEvent
337    ///
338    /// Uses the From trait implementation for DisconnectReason conversion.
339    fn convert_event(p2p_event: P2pEvent) -> Option<NodeEvent> {
340        match p2p_event {
341            P2pEvent::PeerConnected {
342                peer_id,
343                addr,
344                side: _,
345                traversal_method,
346            } => Some(NodeEvent::PeerConnected {
347                peer_id,
348                addr,
349                method: traversal_method,
350                direct: traversal_method.is_direct(),
351            }),
352            P2pEvent::PeerDisconnected { peer_id, reason } => Some(NodeEvent::PeerDisconnected {
353                peer_id,
354                reason: reason.into(), // Use From trait
355            }),
356            P2pEvent::ExternalAddressDiscovered { addr } => {
357                Some(NodeEvent::ExternalAddressDiscovered { addr })
358            }
359            P2pEvent::PortMappingEstablished { external_addr } => {
360                Some(NodeEvent::PortMappingEstablished { external_addr })
361            }
362            P2pEvent::PortMappingRenewed { external_addr } => {
363                Some(NodeEvent::PortMappingRenewed { external_addr })
364            }
365            P2pEvent::PortMappingAddressChanged {
366                previous_addr,
367                external_addr,
368            } => Some(NodeEvent::PortMappingAddressChanged {
369                previous_addr,
370                external_addr,
371            }),
372            P2pEvent::PortMappingFailed { error } => Some(NodeEvent::PortMappingFailed { error }),
373            P2pEvent::PortMappingRemoved { external_addr } => {
374                Some(NodeEvent::PortMappingRemoved { external_addr })
375            }
376            P2pEvent::DirectPathStatus { peer_id, status } => {
377                Some(NodeEvent::DirectPathStatus { peer_id, status })
378            }
379            P2pEvent::DataReceived { peer_id, bytes } => Some(NodeEvent::DataReceived {
380                peer_id,
381                stream_id: 0, // P2pEvent doesn't track stream IDs
382                bytes,
383            }),
384            P2pEvent::ConstrainedDataReceived {
385                remote_addr,
386                connection_id,
387                data,
388            } => {
389                // For constrained data, derive a synthetic peer ID from the transport address
390                let synthetic_peer_id = {
391                    use std::collections::hash_map::DefaultHasher;
392                    use std::hash::{Hash, Hasher};
393                    let synthetic_addr = remote_addr.to_synthetic_socket_addr();
394                    let mut hasher = DefaultHasher::new();
395                    synthetic_addr.hash(&mut hasher);
396                    let hash = hasher.finish();
397                    let mut peer_id_bytes = [0u8; 32];
398                    peer_id_bytes[..8].copy_from_slice(&hash.to_le_bytes());
399                    PeerId(peer_id_bytes)
400                };
401                Some(NodeEvent::DataReceived {
402                    peer_id: synthetic_peer_id,
403                    stream_id: connection_id as u64,
404                    bytes: data.len(),
405                })
406            }
407            P2pEvent::MdnsServiceAdvertised {
408                service,
409                namespace,
410                instance_fullname,
411            } => Some(NodeEvent::MdnsServiceAdvertised {
412                service,
413                namespace,
414                instance_fullname,
415            }),
416            P2pEvent::MdnsPeerDiscovered { peer } => Some(NodeEvent::MdnsPeerDiscovered { peer }),
417            P2pEvent::MdnsPeerUpdated { peer } => Some(NodeEvent::MdnsPeerUpdated { peer }),
418            P2pEvent::MdnsPeerRemoved { peer } => Some(NodeEvent::MdnsPeerRemoved { peer }),
419            P2pEvent::MdnsPeerEligible { peer } => Some(NodeEvent::MdnsPeerEligible { peer }),
420            P2pEvent::MdnsPeerIneligible { peer, reason } => {
421                Some(NodeEvent::MdnsPeerIneligible { peer, reason })
422            }
423            P2pEvent::MdnsPeerApprovalRequired { peer, reason } => {
424                Some(NodeEvent::MdnsPeerApprovalRequired { peer, reason })
425            }
426            P2pEvent::MdnsAutoConnectAttempted { peer, addresses } => {
427                Some(NodeEvent::MdnsAutoConnectAttempted { peer, addresses })
428            }
429            P2pEvent::MdnsAutoConnectSucceeded {
430                peer,
431                authenticated_peer_id,
432                remote_addr,
433            } => Some(NodeEvent::MdnsAutoConnectSucceeded {
434                peer,
435                authenticated_peer_id,
436                remote_addr,
437            }),
438            P2pEvent::MdnsAutoConnectFailed {
439                peer,
440                addresses,
441                error,
442            } => Some(NodeEvent::MdnsAutoConnectFailed {
443                peer,
444                addresses,
445                error,
446            }),
447            // Events without direct NodeEvent equivalents are ignored
448            P2pEvent::NatTraversalProgress { .. }
449            | P2pEvent::BootstrapStatus { .. }
450            | P2pEvent::PeerAuthenticated { .. }
451            | P2pEvent::PeerAddressUpdated { .. }
452            | P2pEvent::RelayEstablished { .. } => None,
453        }
454    }
455
456    // === Identity ===
457
458    /// Get this node's peer ID
459    ///
460    /// The peer ID is derived from the Ed25519 public key and is
461    /// the unique identifier for this node on the network.
462    pub fn peer_id(&self) -> PeerId {
463        self.inner.peer_id()
464    }
465
466    /// Get the local bind address
467    ///
468    /// Returns `None` if the endpoint hasn't bound yet.
469    pub fn local_addr(&self) -> Option<SocketAddr> {
470        self.inner.local_addr()
471    }
472
473    /// Get the observed external address
474    ///
475    /// This is the address as seen by other peers on the network.
476    /// Returns `None` if no external address has been discovered yet.
477    pub fn external_addr(&self) -> Option<SocketAddr> {
478        self.inner.external_addr()
479    }
480
481    /// Return the latest best-effort direct-path status for a peer, when known.
482    pub fn direct_path_status(&self, peer_id: PeerId) -> Option<crate::DirectPathStatus> {
483        self.inner.direct_path_status(peer_id)
484    }
485
486    /// Get the ML-DSA-65 public key bytes (1952 bytes)
487    pub fn public_key_bytes(&self) -> &[u8] {
488        self.inner.public_key_bytes()
489    }
490
491    /// Get access to the underlying P2pEndpoint for advanced operations.
492    pub fn inner_endpoint(&self) -> &Arc<P2pEndpoint> {
493        &self.inner
494    }
495
496    /// Get the transport registry for this node
497    ///
498    /// The transport registry contains all registered transport providers (UDP, BLE, etc.)
499    /// that this node can use for connectivity.
500    pub fn transport_registry(&self) -> &crate::transport::TransportRegistry {
501        self.inner.transport_registry()
502    }
503
504    // === Connections ===
505
506    /// Connect to a peer by address.
507    ///
508    /// Thin facade over [`P2pEndpoint::connect_addr`], which uses the unified
509    /// outbound connectivity orchestrator.
510    pub async fn connect_addr(&self, addr: SocketAddr) -> Result<PeerConnection, NodeError> {
511        self.inner
512            .connect_addr(addr)
513            .await
514            .map_err(NodeError::Endpoint)
515    }
516
517    /// Connect to a peer by durable peer ID.
518    ///
519    /// Thin facade over the unified peer-oriented [`P2pEndpoint`] connect path.
520    /// Strategy selection remains internal to the endpoint.
521    pub async fn connect_peer(&self, peer_id: PeerId) -> Result<PeerConnection, NodeError> {
522        self.inner
523            .connect_peer(peer_id)
524            .await
525            .map_err(NodeError::Endpoint)
526    }
527
528    /// Connect to a peer by durable peer ID.
529    ///
530    /// Compatibility-oriented alias retained for older callers. Prefer
531    /// [`Self::connect_peer`] as the canonical peer-oriented public surface.
532    #[deprecated(note = "use connect_peer(peer_id) for the canonical peer-oriented API")]
533    pub async fn connect(&self, peer_id: PeerId) -> Result<PeerConnection, NodeError> {
534        self.connect_peer(peer_id).await
535    }
536
537    /// Connect to a peer by durable peer ID plus explicit address hints.
538    ///
539    /// Use this when the caller has candidate addresses for the peer and wants
540    /// the transport to combine those hints with peer-authenticated fallback
541    /// orchestration.
542    pub async fn connect_peer_with_addrs(
543        &self,
544        peer_id: PeerId,
545        addrs: Vec<SocketAddr>,
546    ) -> Result<PeerConnection, NodeError> {
547        self.inner
548            .connect_peer_with_addrs(peer_id, addrs)
549            .await
550            .map_err(NodeError::Endpoint)
551    }
552
553    /// Merge externally discovered peer hints into the node's transport view.
554    ///
555    /// This is the advanced discovery bridge for callers that learn peer
556    /// addresses or assist-role capability hints from higher layers.
557    pub async fn upsert_peer_hints(
558        &self,
559        peer_id: PeerId,
560        addrs: Vec<SocketAddr>,
561        capabilities: Option<PeerCapabilities>,
562    ) {
563        self.inner
564            .upsert_peer_hints(peer_id, addrs, capabilities)
565            .await;
566    }
567
568    /// Accept an incoming connection
569    ///
570    /// Waits for and accepts the next incoming connection.
571    /// Returns `None` if the node is shutting down.
572    ///
573    /// # Example
574    ///
575    /// ```rust,ignore
576    /// while let Some(conn) = node.accept().await {
577    ///     println!("Accepted connection from: {:?}", conn.peer_id);
578    ///     // Handle connection...
579    /// }
580    /// ```
581    pub async fn accept(&self) -> Option<PeerConnection> {
582        self.inner.accept().await
583    }
584
585    /// Add a known peer dynamically.
586    ///
587    /// Thin facade over [`P2pEndpoint::add_known_peer`]. Known peers help with
588    /// initial connectivity, discovery, and NAT traversal coordination.
589    pub async fn add_peer(&self, addr: SocketAddr) {
590        self.inner.add_known_peer(addr).await;
591    }
592
593    /// Connect to all known peers
594    ///
595    /// Returns the number of successful connections.
596    pub async fn connect_known_peers(&self) -> Result<usize, NodeError> {
597        self.inner
598            .connect_known_peers()
599            .await
600            .map_err(NodeError::Endpoint)
601    }
602
603    /// Disconnect from a peer
604    pub async fn disconnect(&self, peer_id: &PeerId) -> Result<(), NodeError> {
605        self.inner
606            .disconnect(peer_id)
607            .await
608            .map_err(NodeError::Endpoint)
609    }
610
611    /// Get list of connected peers
612    pub async fn connected_peers(&self) -> Vec<PeerConnection> {
613        self.inner.connected_peers().await
614    }
615
616    /// Check if connected to a peer
617    pub async fn is_connected(&self, peer_id: &PeerId) -> bool {
618        self.inner.is_connected(peer_id).await
619    }
620
621    /// Get a best-effort connection health snapshot for a peer.
622    pub async fn connection_health(&self, peer_id: &PeerId) -> ConnectionHealth {
623        self.inner.connection_health(peer_id).await
624    }
625
626    /// Subscribe to lifecycle events for a specific peer.
627    pub fn subscribe_peer_events(
628        &self,
629        peer_id: &PeerId,
630    ) -> broadcast::Receiver<PeerLifecycleEvent> {
631        self.inner.subscribe_peer_events(peer_id)
632    }
633
634    /// Subscribe to lifecycle events for all peers.
635    pub fn subscribe_all_peer_events(&self) -> broadcast::Receiver<(PeerId, PeerLifecycleEvent)> {
636        self.inner.subscribe_all_peer_events()
637    }
638
639    // === Messaging ===
640
641    /// Send data to a peer
642    pub async fn send(&self, peer_id: &PeerId, data: &[u8]) -> Result<(), NodeError> {
643        self.inner
644            .send(peer_id, data)
645            .await
646            .map_err(NodeError::Endpoint)
647    }
648
649    /// Send data and wait until the remote receive pipeline accepts it.
650    pub async fn send_with_receive_ack(
651        &self,
652        peer_id: &PeerId,
653        data: &[u8],
654        timeout: Duration,
655    ) -> Result<(), NodeError> {
656        self.inner
657            .send_with_receive_ack(peer_id, data, timeout)
658            .await
659            .map_err(NodeError::Endpoint)
660    }
661
662    /// Actively probe peer liveness and measure round-trip time.
663    ///
664    /// Sends a lightweight probe envelope and waits for the peer's reader task
665    /// to acknowledge it. Returns the measured round-trip duration on success.
666    /// Probe traffic is invisible to [`Self::recv`] — it does not emit
667    /// `DataReceived` events or deliver payloads.
668    pub async fn probe_peer(
669        &self,
670        peer_id: &PeerId,
671        timeout: Duration,
672    ) -> Result<Duration, NodeError> {
673        self.inner
674            .probe_peer(peer_id, timeout)
675            .await
676            .map_err(NodeError::Endpoint)
677    }
678
679    /// Receive data from any peer
680    pub async fn recv(&self) -> Result<(PeerId, Vec<u8>), NodeError> {
681        self.inner.recv().await.map_err(NodeError::Endpoint)
682    }
683
684    // === Observability ===
685
686    /// Get a snapshot of the node's current status
687    ///
688    /// This provides a practical snapshot of the node's state,
689    /// including a best-effort NAT behavior hint, connectivity,
690    /// relay/coordinator hints, and performance.
691    ///
692    /// # Example
693    ///
694    /// ```rust,ignore
695    /// let status = node.status().await;
696    /// println!("NAT behavior hint: {}", status.nat_type);
697    /// println!("Connected peers: {}", status.connected_peers);
698    /// println!("Acting as relay: {}", status.is_relaying);
699    /// ```
700    pub async fn status(&self) -> NodeStatus {
701        let stats = self.inner.stats().await;
702        let connected_peers = self.inner.connected_peers().await;
703
704        // Derive a best-effort NAT behavior hint from native connectivity
705        // outcomes only. This is observational telemetry, not authoritative
706        // NAT classification.
707        let nat_type = self.detect_nat_type(&stats);
708
709        // Address knowledge and reachability are separate concepts.
710        // A global address is not proof of direct reachability.
711        let local_addr = self.local_addr();
712        let external_addr = self.external_addr();
713
714        // Collect ALL external addresses (both IPv4 and IPv6) from all
715        // connections and QUIC paths. This is critical for dual-stack nodes
716        // where different peers report different address families.
717        let mut external_addrs = self.inner.all_external_addrs();
718        // Ensure the primary external address is included (backward compat)
719        if let Some(addr) = external_addr {
720            if !external_addrs.contains(&addr) {
721                external_addrs.insert(0, addr);
722            }
723        }
724
725        // Calculate hole punch success rate
726        let hole_punch_success_rate = if stats.nat_traversal_attempts > 0 {
727            stats.nat_traversal_successes as f64 / stats.nat_traversal_attempts as f64
728        } else {
729            0.0
730        };
731
732        let has_global_address = external_addrs
733            .iter()
734            .copied()
735            .chain(local_addr)
736            .any(|addr| {
737                socket_addr_scope(addr)
738                    .is_some_and(|scope| scope == crate::ReachabilityScope::Global)
739            });
740        let port_mapping = self.inner.port_mapping_snapshot();
741        let mdns = self.inner.mdns_snapshot();
742
743        // A node is directly reachable only after fresh, peer-verified direct
744        // inbound evidence. Scope is freshness-aware too, so an old global
745        // observation cannot keep inflating current reachability.
746        let fresh_scope = [
747            (
748                crate::ReachabilityScope::Global,
749                stats.last_direct_global_at,
750            ),
751            (
752                crate::ReachabilityScope::LocalNetwork,
753                stats.last_direct_local_at,
754            ),
755            (
756                crate::ReachabilityScope::Loopback,
757                stats.last_direct_loopback_at,
758            ),
759        ]
760        .into_iter()
761        .find_map(|(scope, seen)| {
762            seen.filter(|instant| instant.elapsed() <= DIRECT_REACHABILITY_TTL)
763                .map(|_| scope)
764        });
765        let can_receive_direct =
766            stats.active_direct_incoming_connections > 0 || fresh_scope.is_some();
767        let direct_reachability_scope = fresh_scope;
768
769        // Relay/coordinator activity is still best-effort, but we can surface
770        // a conservative runtime snapshot from existing NAT/relay state instead
771        // of hard-coded false/zero placeholders.
772        let runtime_assist = self.inner.runtime_assist_snapshot().await;
773        let relay_service_enabled = self.inner.relay_service_enabled();
774        let coordinator_service_enabled = self.inner.coordinator_service_enabled();
775        let bootstrap_service_enabled = self.inner.bootstrap_service_enabled();
776        let is_relaying = runtime_assist.active_relay_sessions > 0;
777        let relay_sessions = runtime_assist.active_relay_sessions;
778        let relay_bytes_forwarded = runtime_assist.relay_bytes_forwarded;
779        let is_coordinating = runtime_assist.successful_coordinations > 0;
780        let coordination_sessions =
781            usize::try_from(runtime_assist.successful_coordinations).unwrap_or(usize::MAX);
782
783        // Calculate average RTT from connected peers
784        let mut total_rtt = Duration::ZERO;
785        let mut rtt_count = 0u32;
786        for peer in &connected_peers {
787            if let Some(metrics) = self.inner.connection_metrics(&peer.peer_id).await {
788                if let Some(rtt) = metrics.rtt {
789                    total_rtt += rtt;
790                    rtt_count += 1;
791                }
792            }
793        }
794        let avg_rtt = if rtt_count > 0 {
795            total_rtt / rtt_count
796        } else {
797            Duration::ZERO
798        };
799
800        NodeStatus {
801            peer_id: self.peer_id(),
802            local_addr: local_addr.unwrap_or_else(|| {
803                "0.0.0.0:0".parse().unwrap_or_else(|_| {
804                    SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0)
805                })
806            }),
807            external_addrs,
808            nat_type,
809            can_receive_direct,
810            direct_reachability_scope,
811            has_global_address,
812            port_mapping_active: port_mapping.active,
813            port_mapping_addr: port_mapping.external_addr,
814            mdns_browsing: mdns.browsing,
815            mdns_advertising: mdns.advertising,
816            mdns_discovered_peers: mdns.discovered_peers.len(),
817            relay_service_enabled,
818            coordinator_service_enabled,
819            bootstrap_service_enabled,
820            connected_peers: connected_peers.len(),
821            active_connections: stats.active_connections,
822            pending_connections: 0, // Not tracked yet
823            direct_connections: stats.direct_connections,
824            relayed_connections: stats.relayed_connections,
825            hole_punch_success_rate,
826            is_relaying,
827            relay_sessions,
828            relay_bytes_forwarded,
829            is_coordinating,
830            coordination_sessions,
831            avg_rtt,
832            uptime: self.start_time.elapsed(),
833        }
834    }
835
836    /// Subscribe to node events
837    ///
838    /// Returns a receiver for all significant node events including
839    /// connections, disconnections, NAT detection, and relay activity.
840    ///
841    /// # Example
842    ///
843    /// ```rust,ignore
844    /// let mut events = node.subscribe();
845    /// tokio::spawn(async move {
846    ///     while let Ok(event) = events.recv().await {
847    ///         match event {
848    ///             NodeEvent::PeerConnected { peer_id, .. } => {
849    ///                 println!("Connected: {:?}", peer_id);
850    ///             }
851    ///             _ => {}
852    ///         }
853    ///     }
854    /// });
855    /// ```
856    pub fn subscribe(&self) -> broadcast::Receiver<NodeEvent> {
857        self.event_tx.subscribe()
858    }
859
860    /// Subscribe to raw P2pEvents (for advanced use)
861    ///
862    /// This provides access to the underlying P2pEndpoint events.
863    /// Most applications should use `subscribe()` for NodeEvents.
864    pub fn subscribe_raw(&self) -> broadcast::Receiver<P2pEvent> {
865        self.inner.subscribe()
866    }
867
868    // === Shutdown ===
869
870    /// Gracefully shut down the node
871    ///
872    /// This closes all connections and releases resources.
873    pub async fn shutdown(self) {
874        self.inner.shutdown().await;
875    }
876
877    /// Check if the node is still running
878    pub fn is_running(&self) -> bool {
879        self.inner.is_running()
880    }
881
882    // === Private Helpers ===
883
884    /// Derive a coarse NAT behavior hint from native QUIC connection outcomes.
885    ///
886    /// This does not classify NAT mapping/filtering behavior in the RFC 4787 /
887    /// RFC 5780 sense.
888    fn detect_nat_type(&self, stats: &crate::p2p_endpoint::EndpointStats) -> NatType {
889        // This remains a soft debug hint only. Do not treat it as direct
890        // reachability evidence.
891        if stats.direct_connections > 0 && stats.relayed_connections == 0 {
892            return NatType::FullCone;
893        }
894
895        if stats.direct_connections > 0 && stats.relayed_connections > 0 {
896            return NatType::PortRestricted;
897        }
898
899        if stats.relayed_connections > stats.direct_connections {
900            return NatType::Symmetric;
901        }
902
903        NatType::Unknown
904    }
905}
906
907// Enable cloning through Arc
908impl Clone for Node {
909    fn clone(&self) -> Self {
910        Self {
911            inner: Arc::clone(&self.inner),
912            start_time: self.start_time,
913            event_tx: self.event_tx.clone(),
914        }
915    }
916}
917
918#[cfg(test)]
919mod tests {
920    use super::*;
921    use crate::derive_peer_id_from_public_key;
922
923    #[tokio::test]
924    async fn test_node_new_default() {
925        let node = Node::new().await;
926        assert!(node.is_ok(), "Node::new() should succeed: {:?}", node.err());
927
928        let node = node.unwrap();
929        assert!(node.is_running());
930
931        // Peer ID should be valid (non-zero)
932        let peer_id = node.peer_id();
933        assert_ne!(peer_id.0, [0u8; 32]);
934
935        node.shutdown().await;
936    }
937
938    #[tokio::test]
939    async fn test_node_bind() {
940        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
941        let node = Node::bind(addr).await;
942        assert!(node.is_ok(), "Node::bind() should succeed");
943
944        let node = node.unwrap();
945        assert!(node.local_addr().is_some());
946
947        node.shutdown().await;
948    }
949
950    #[tokio::test]
951    async fn test_node_with_peers() {
952        let peers = vec!["127.0.0.1:9000".parse().unwrap()];
953        let node = Node::with_peers(peers).await;
954        assert!(node.is_ok(), "Node::with_peers() should succeed");
955
956        node.unwrap().shutdown().await;
957    }
958
959    #[tokio::test]
960    async fn test_node_with_config() {
961        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
962        let config = NodeConfig::builder().bind_addr(addr).build();
963
964        let node = Node::with_config(config).await;
965        assert!(node.is_ok(), "Node::with_config() should succeed");
966
967        node.unwrap().shutdown().await;
968    }
969
970    #[tokio::test]
971    async fn test_node_status() {
972        let node = Node::new().await.unwrap();
973        let status = node.status().await;
974
975        // Check status fields are populated
976        assert_ne!(status.peer_id.0, [0u8; 32]);
977        assert_eq!(status.connected_peers, 0); // No connections yet
978        assert!(!status.port_mapping_active);
979        assert_eq!(status.port_mapping_addr, None);
980        assert!(status.relay_service_enabled);
981        assert!(status.coordinator_service_enabled);
982        assert!(status.bootstrap_service_enabled);
983        assert!(!status.is_relaying);
984        assert!(!status.is_coordinating);
985
986        node.shutdown().await;
987    }
988
989    #[tokio::test]
990    async fn test_node_subscribe() {
991        let node = Node::new().await.unwrap();
992        let _events = node.subscribe();
993
994        // Just verify subscription works
995        node.shutdown().await;
996    }
997
998    #[tokio::test]
999    async fn test_node_is_clone() {
1000        let node1 = Node::new().await.unwrap();
1001        let node2 = node1.clone();
1002
1003        // Both should have same peer ID
1004        assert_eq!(node1.peer_id(), node2.peer_id());
1005
1006        node1.shutdown().await;
1007        // node2 still references the same Arc, so shutdown already happened
1008    }
1009
1010    #[tokio::test]
1011    async fn test_node_debug() {
1012        let node = Node::new().await.unwrap();
1013        let debug_str = format!("{:?}", node);
1014        assert!(debug_str.contains("Node"));
1015        assert!(debug_str.contains("peer_id"));
1016
1017        node.shutdown().await;
1018    }
1019
1020    #[tokio::test]
1021    async fn test_node_identity() {
1022        use crate::crypto::raw_public_keys::key_utils::derive_peer_id_from_key_bytes;
1023
1024        let node = Node::new().await.unwrap();
1025
1026        // Verify identity methods
1027        let peer_id = node.peer_id();
1028        let public_key = node.public_key_bytes();
1029
1030        // Peer ID should be derived from public key (ML-DSA-65)
1031        let derived = derive_peer_id_from_key_bytes(public_key).unwrap();
1032        assert_eq!(peer_id, derived);
1033
1034        node.shutdown().await;
1035    }
1036
1037    #[tokio::test]
1038    async fn test_connected_peers_empty() {
1039        let node = Node::new().await.unwrap();
1040        let peers = node.connected_peers().await;
1041        assert!(peers.is_empty());
1042
1043        node.shutdown().await;
1044    }
1045
1046    // Full peer establishment remains exercised in the default-feature matrix.
1047    // The stripped no-default-features lib configuration is a portability/
1048    // compile-surface check and does not guarantee loopback connection success.
1049    #[cfg(all(feature = "platform-verifier", feature = "network-discovery"))]
1050    #[tokio::test]
1051    async fn test_connect_peer_with_addrs_uses_explicit_hint() {
1052        let listener = Node::bind("127.0.0.1:0".parse().unwrap()).await.unwrap();
1053        let dialer = Node::bind("127.0.0.1:0".parse().unwrap()).await.unwrap();
1054
1055        let listener_addr = listener.local_addr().expect("listener addr");
1056        let listener_addr = if listener_addr.ip().is_unspecified() {
1057            SocketAddr::new(
1058                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
1059                listener_addr.port(),
1060            )
1061        } else {
1062            listener_addr
1063        };
1064        let peer_conn = tokio::time::timeout(
1065            Duration::from_secs(30),
1066            dialer.connect_peer_with_addrs(listener.peer_id(), vec![listener_addr]),
1067        )
1068        .await
1069        .expect("connect should not time out")
1070        .expect("dialer should connect using explicit address hint");
1071        assert_eq!(peer_conn.peer_id, listener.peer_id());
1072
1073        let accepted = tokio::time::timeout(std::time::Duration::from_secs(5), listener.accept())
1074            .await
1075            .expect("accept should complete")
1076            .expect("listener should accept");
1077        assert_eq!(accepted.peer_id, dialer.peer_id());
1078
1079        dialer.shutdown().await;
1080        listener.shutdown().await;
1081    }
1082
1083    #[cfg(all(feature = "platform-verifier", feature = "network-discovery"))]
1084    #[tokio::test]
1085    async fn test_connect_peer_uses_upserted_peer_hints() {
1086        let listener = Node::bind("127.0.0.1:0".parse().unwrap()).await.unwrap();
1087        let dialer = Node::bind("127.0.0.1:0".parse().unwrap()).await.unwrap();
1088
1089        let listener_addr = listener.local_addr().expect("listener addr");
1090        let listener_addr = if listener_addr.ip().is_unspecified() {
1091            SocketAddr::new(
1092                std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
1093                listener_addr.port(),
1094            )
1095        } else {
1096            listener_addr
1097        };
1098
1099        dialer
1100            .upsert_peer_hints(listener.peer_id(), vec![listener_addr], None)
1101            .await;
1102
1103        let peer_conn = tokio::time::timeout(
1104            Duration::from_secs(30),
1105            dialer.connect_peer(listener.peer_id()),
1106        )
1107        .await
1108        .expect("connect should not time out")
1109        .expect("dialer should connect using upserted peer hints");
1110        assert_eq!(peer_conn.peer_id, listener.peer_id());
1111
1112        let accepted = tokio::time::timeout(std::time::Duration::from_secs(5), listener.accept())
1113            .await
1114            .expect("accept should complete")
1115            .expect("listener should accept");
1116        assert_eq!(accepted.peer_id, dialer.peer_id());
1117
1118        dialer.shutdown().await;
1119        listener.shutdown().await;
1120    }
1121
1122    #[tokio::test]
1123    async fn test_node_error_types() {
1124        // Test error conversions
1125        let err = NodeError::Creation("test".to_string());
1126        assert!(err.to_string().contains("test"));
1127
1128        let err = NodeError::Connection("connection failed".to_string());
1129        assert!(err.to_string().contains("connection"));
1130
1131        let err = NodeError::ShuttingDown;
1132        assert!(err.to_string().contains("shutting down"));
1133    }
1134
1135    #[tokio::test]
1136    async fn test_node_with_keypair_persistence() {
1137        use crate::crypto::raw_public_keys::key_utils::generate_ml_dsa_keypair;
1138
1139        // Generate an ML-DSA-65 keypair
1140        let (public_key, secret_key) = generate_ml_dsa_keypair().unwrap();
1141        let expected_peer_id = derive_peer_id_from_public_key(&public_key);
1142        let expected_public_key_bytes = public_key.as_bytes().to_vec();
1143
1144        // Create node with the keypair
1145        let node = Node::with_keypair(public_key, secret_key).await.unwrap();
1146
1147        // Verify the node uses the same identity
1148        assert_eq!(node.peer_id(), expected_peer_id);
1149        assert_eq!(node.public_key_bytes(), expected_public_key_bytes);
1150
1151        node.shutdown().await;
1152    }
1153
1154    #[tokio::test]
1155    async fn test_node_keypair_via_config() {
1156        use crate::crypto::raw_public_keys::key_utils::generate_ml_dsa_keypair;
1157
1158        // Generate an ML-DSA-65 keypair
1159        let (public_key, secret_key) = generate_ml_dsa_keypair().unwrap();
1160        let expected_peer_id = derive_peer_id_from_public_key(&public_key);
1161        let expected_public_key_bytes = public_key.as_bytes().to_vec();
1162
1163        // Create node via config with keypair
1164        let config = NodeConfig::with_keypair(public_key, secret_key);
1165        let node = Node::with_config(config).await.unwrap();
1166
1167        // Verify the node uses the same identity
1168        assert_eq!(node.peer_id(), expected_peer_id);
1169        assert_eq!(node.public_key_bytes(), expected_public_key_bytes);
1170
1171        node.shutdown().await;
1172    }
1173
1174    #[tokio::test]
1175    async fn test_node_event_bridge_exists() {
1176        let node = Node::new().await.unwrap();
1177
1178        // Subscribe to events - this should work
1179        let mut events = node.subscribe();
1180
1181        // The event channel should be connected (won't receive anything yet,
1182        // but the bridge task should be running)
1183        // We can't easily test event reception without connections,
1184        // but we verify the infrastructure is in place
1185        assert!(events.try_recv().is_err()); // No events yet
1186
1187        node.shutdown().await;
1188    }
1189
1190    #[tokio::test]
1191    async fn test_node_with_host_identity() {
1192        use crate::host_identity::HostIdentity;
1193
1194        // Create a temporary directory for storage
1195        let temp_dir =
1196            std::env::temp_dir().join(format!("ant-quic-test-node-{}", std::process::id()));
1197        let _ = std::fs::create_dir_all(&temp_dir);
1198
1199        // Generate a HostIdentity
1200        let host = HostIdentity::generate();
1201        let network_id = b"test-network";
1202
1203        // Create first node with host identity
1204        let node1 = Node::with_host_identity(&host, network_id, &temp_dir)
1205            .await
1206            .unwrap();
1207        let peer_id_1 = node1.peer_id();
1208        let public_key_1 = node1.public_key_bytes().to_vec();
1209
1210        // Verify the node is running
1211        assert!(node1.is_running());
1212
1213        // Shutdown and cleanup
1214        node1.shutdown().await;
1215
1216        // Create second node with same host identity - should have same identity
1217        let node2 = Node::with_host_identity(&host, network_id, &temp_dir)
1218            .await
1219            .unwrap();
1220        let peer_id_2 = node2.peer_id();
1221        let public_key_2 = node2.public_key_bytes().to_vec();
1222
1223        // Verify both nodes have the same identity
1224        assert_eq!(peer_id_1, peer_id_2);
1225        assert_eq!(public_key_1, public_key_2);
1226
1227        node2.shutdown().await;
1228
1229        // Cleanup temp directory
1230        let _ = std::fs::remove_dir_all(&temp_dir);
1231    }
1232
1233    #[tokio::test]
1234    async fn test_node_host_identity_per_network_isolation() {
1235        use crate::host_identity::HostIdentity;
1236
1237        // Create a temporary directory for storage
1238        let temp_dir =
1239            std::env::temp_dir().join(format!("ant-quic-test-isolation-{}", std::process::id()));
1240        let _ = std::fs::create_dir_all(&temp_dir);
1241
1242        // Generate a HostIdentity
1243        let host = HostIdentity::generate();
1244
1245        // Create nodes with different network IDs
1246        let node1 = Node::with_host_identity(&host, b"network-1", &temp_dir)
1247            .await
1248            .unwrap();
1249        let peer_id_1 = node1.peer_id();
1250
1251        let node2 = Node::with_host_identity(&host, b"network-2", &temp_dir)
1252            .await
1253            .unwrap();
1254        let peer_id_2 = node2.peer_id();
1255
1256        // Different networks should have different identities (privacy)
1257        assert_ne!(peer_id_1, peer_id_2);
1258
1259        node1.shutdown().await;
1260        node2.shutdown().await;
1261
1262        // Cleanup temp directory
1263        let _ = std::fs::remove_dir_all(&temp_dir);
1264    }
1265}