ant_quic/
link_transport.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! # Link Transport Abstraction Layer
9//!
10//! This module provides the [`LinkTransport`] and [`LinkConn`] traits that abstract
11//! the transport layer for overlay networks like saorsa-core. This enables:
12//!
13//! - **Version decoupling**: Overlays can compile against a stable trait interface
14//!   while ant-quic evolves underneath
15//! - **Testing**: Mock transports for unit testing overlay logic
16//! - **Alternative transports**: Future support for WebRTC, TCP fallback, etc.
17//!
18//! ## Architecture
19//!
20//! ```text
21//! ┌─────────────────────────────────────────────────────────────────┐
22//! │                      saorsa-core (overlay)                       │
23//! │  DHT routing │ Record storage │ Greedy routing │ Naming         │
24//! └─────────────────────────────────────────────────────────────────┘
25//!                              │
26//!                              ▼
27//! ┌─────────────────────────────────────────────────────────────────┐
28//! │                    LinkTransport trait                          │
29//! │  local_peer() │ peer_table() │ dial() │ accept() │ subscribe()  │
30//! └─────────────────────────────────────────────────────────────────┘
31//!                              │
32//!                              ▼
33//! ┌─────────────────────────────────────────────────────────────────┐
34//! │                  ant-quic P2pEndpoint                            │
35//! │  QUIC transport │ NAT traversal │ PQC │ Connection management   │
36//! └─────────────────────────────────────────────────────────────────┘
37//! ```
38//!
39//! ## Example
40//!
41//! ```rust,ignore
42//! use ant_quic::link_transport::{LinkTransport, LinkConn, ProtocolId};
43//!
44//! // Define your overlay's protocol
45//! const DHT_PROTOCOL: ProtocolId = ProtocolId::from_static(b"saorsa-dht/1.0.0");
46//!
47//! async fn run_overlay<T: LinkTransport>(transport: T) -> anyhow::Result<()> {
48//!     // Accept incoming connections for our protocol
49//!     let mut incoming = transport.accept(DHT_PROTOCOL);
50//!     
51//!     // Dial a peer
52//!     let peer_id = /* ... */;
53//!     let conn = transport.dial(peer_id, DHT_PROTOCOL).await?;
54//!     
55//!     // Open a bidirectional stream
56//!     let (send, recv) = conn.open_bi().await?;
57//!     
58//!     Ok(())
59//! }
60//! ```
61
62use std::fmt;
63use std::future::Future;
64use std::net::SocketAddr;
65use std::pin::Pin;
66use std::sync::Arc;
67use std::time::{Duration, Instant, SystemTime};
68
69use bytes::Bytes;
70use thiserror::Error;
71use tokio::sync::broadcast;
72
73use crate::nat_traversal_api::PeerId;
74
75// ============================================================================
76// Protocol Identifier
77// ============================================================================
78
79/// Protocol identifier for multiplexing multiple overlays on a single transport.
80///
81/// Protocols are identified by a 16-byte value, allowing efficient binary comparison
82/// while supporting human-readable names during debugging.
83///
84/// # Examples
85///
86/// ```rust
87/// use ant_quic::link_transport::ProtocolId;
88///
89/// // From a static string (padded/truncated to 16 bytes)
90/// const DHT: ProtocolId = ProtocolId::from_static(b"saorsa-dht/1.0.0");
91///
92/// // From bytes
93/// let proto = ProtocolId::new([0x73, 0x61, 0x6f, 0x72, 0x73, 0x61, 0x2d, 0x64,
94///                              0x68, 0x74, 0x2f, 0x31, 0x2e, 0x30, 0x2e, 0x30]);
95/// ```
96#[derive(Clone, Copy, PartialEq, Eq, Hash)]
97pub struct ProtocolId(pub [u8; 16]);
98
99impl ProtocolId {
100    /// Create a new protocol ID from raw bytes.
101    #[inline]
102    pub const fn new(bytes: [u8; 16]) -> Self {
103        Self(bytes)
104    }
105
106    /// Create a protocol ID from a static byte string.
107    ///
108    /// The string is padded with zeros if shorter than 16 bytes,
109    /// or truncated if longer.
110    #[inline]
111    pub const fn from_static(s: &[u8]) -> Self {
112        let mut bytes = [0u8; 16];
113        let len = if s.len() < 16 { s.len() } else { 16 };
114        let mut i = 0;
115        while i < len {
116            bytes[i] = s[i];
117            i += 1;
118        }
119        Self(bytes)
120    }
121
122    /// Get the raw bytes of this protocol ID.
123    #[inline]
124    pub const fn as_bytes(&self) -> &[u8; 16] {
125        &self.0
126    }
127
128    /// The default protocol for connections without explicit protocol negotiation.
129    pub const DEFAULT: Self = Self::from_static(b"ant-quic/default");
130
131    /// Protocol ID for NAT traversal coordination messages.
132    pub const NAT_TRAVERSAL: Self = Self::from_static(b"ant-quic/nat");
133
134    /// Protocol ID for relay traffic.
135    pub const RELAY: Self = Self::from_static(b"ant-quic/relay");
136}
137
138impl Default for ProtocolId {
139    fn default() -> Self {
140        Self::DEFAULT
141    }
142}
143
144impl fmt::Debug for ProtocolId {
145    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146        // Try to display as UTF-8 string, trimming null bytes
147        let end = self.0.iter().position(|&b| b == 0).unwrap_or(16);
148        if let Ok(s) = std::str::from_utf8(&self.0[..end]) {
149            write!(f, "ProtocolId({:?})", s)
150        } else {
151            write!(f, "ProtocolId({:?})", hex::encode(self.0))
152        }
153    }
154}
155
156impl fmt::Display for ProtocolId {
157    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158        let end = self.0.iter().position(|&b| b == 0).unwrap_or(16);
159        if let Ok(s) = std::str::from_utf8(&self.0[..end]) {
160            write!(f, "{}", s)
161        } else {
162            write!(f, "{}", hex::encode(self.0))
163        }
164    }
165}
166
167impl From<&str> for ProtocolId {
168    fn from(s: &str) -> Self {
169        Self::from_static(s.as_bytes())
170    }
171}
172
173impl From<[u8; 16]> for ProtocolId {
174    fn from(bytes: [u8; 16]) -> Self {
175        Self(bytes)
176    }
177}
178
179// ============================================================================
180// Peer Capabilities
181// ============================================================================
182
183/// NAT type classification hint for connection strategy selection.
184#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
185pub enum NatHint {
186    /// No NAT detected (public IP, direct connectivity)
187    None,
188    /// Full cone NAT (easiest to traverse)
189    FullCone,
190    /// Address-restricted cone NAT
191    AddressRestrictedCone,
192    /// Port-restricted cone NAT
193    PortRestrictedCone,
194    /// Symmetric NAT (hardest to traverse, may require relay)
195    Symmetric,
196    /// Unknown NAT type
197    Unknown,
198}
199
200impl Default for NatHint {
201    fn default() -> Self {
202        Self::Unknown
203    }
204}
205
206/// Capabilities and quality metrics for a connected peer.
207///
208/// This struct captures both static capabilities (what the peer can do)
209/// and dynamic metrics (how well the peer is performing).
210#[derive(Debug, Clone)]
211pub struct Capabilities {
212    /// Whether this peer can relay traffic for NAT traversal.
213    pub supports_relay: bool,
214
215    /// Whether this peer can coordinate NAT hole-punching.
216    pub supports_coordination: bool,
217
218    /// Observed external addresses for this peer.
219    pub observed_addrs: Vec<SocketAddr>,
220
221    /// Protocols this peer advertises support for.
222    pub protocols: Vec<ProtocolId>,
223
224    /// Last time we successfully communicated with this peer.
225    pub last_seen: SystemTime,
226
227    /// Median round-trip time in milliseconds (p50).
228    pub rtt_ms_p50: u32,
229
230    /// Estimated RTT jitter in milliseconds.
231    pub rtt_jitter_ms: u32,
232
233    /// Packet loss rate (0.0 to 1.0).
234    pub packet_loss: f32,
235
236    /// Inferred NAT type for connection strategy hints.
237    pub nat_type_hint: Option<NatHint>,
238
239    /// Peer's advertised bandwidth limit (bytes/sec), if any.
240    pub bandwidth_limit: Option<u64>,
241
242    /// Number of successful connections to this peer.
243    pub successful_connections: u32,
244
245    /// Number of failed connection attempts to this peer.
246    pub failed_connections: u32,
247
248    /// Whether this peer is currently connected.
249    pub is_connected: bool,
250}
251
252impl Default for Capabilities {
253    fn default() -> Self {
254        Self {
255            supports_relay: false,
256            supports_coordination: false,
257            observed_addrs: Vec::new(),
258            protocols: Vec::new(),
259            last_seen: SystemTime::UNIX_EPOCH,
260            rtt_ms_p50: 0,
261            rtt_jitter_ms: 0,
262            packet_loss: 0.0,
263            nat_type_hint: None,
264            bandwidth_limit: None,
265            successful_connections: 0,
266            failed_connections: 0,
267            is_connected: false,
268        }
269    }
270}
271
272impl Capabilities {
273    /// Create capabilities for a newly connected peer.
274    pub fn new_connected(addr: SocketAddr) -> Self {
275        Self {
276            observed_addrs: vec![addr],
277            last_seen: SystemTime::now(),
278            is_connected: true,
279            ..Default::default()
280        }
281    }
282
283    /// Calculate a quality score for peer selection (0.0 to 1.0).
284    ///
285    /// Higher scores indicate better peers for connection.
286    pub fn quality_score(&self) -> f32 {
287        let mut score = 0.5; // Base score
288
289        // RTT component (lower is better, max 300ms considered)
290        let rtt_score = 1.0 - (self.rtt_ms_p50 as f32 / 300.0).min(1.0);
291        score += rtt_score * 0.3;
292
293        // Packet loss component
294        let loss_score = 1.0 - self.packet_loss;
295        score += loss_score * 0.2;
296
297        // Connection success rate
298        let total = self.successful_connections + self.failed_connections;
299        if total > 0 {
300            let success_rate = self.successful_connections as f32 / total as f32;
301            score += success_rate * 0.2;
302        }
303
304        // Capability bonus
305        if self.supports_relay {
306            score += 0.05;
307        }
308        if self.supports_coordination {
309            score += 0.05;
310        }
311
312        // NAT type penalty
313        if let Some(nat) = self.nat_type_hint {
314            match nat {
315                NatHint::None | NatHint::FullCone => {}
316                NatHint::AddressRestrictedCone | NatHint::PortRestrictedCone => {
317                    score -= 0.05;
318                }
319                NatHint::Symmetric => {
320                    score -= 0.15;
321                }
322                NatHint::Unknown => {
323                    score -= 0.02;
324                }
325            }
326        }
327
328        score.clamp(0.0, 1.0)
329    }
330
331    /// Check if this peer supports a specific protocol.
332    pub fn supports_protocol(&self, proto: &ProtocolId) -> bool {
333        self.protocols.contains(proto)
334    }
335}
336
337// ============================================================================
338// Link Events
339// ============================================================================
340
341/// Reason for peer disconnection.
342#[derive(Debug, Clone, PartialEq, Eq)]
343pub enum DisconnectReason {
344    /// Clean shutdown initiated by local side.
345    LocalClose,
346    /// Clean shutdown initiated by remote side.
347    RemoteClose,
348    /// Connection timed out.
349    Timeout,
350    /// Transport error occurred.
351    TransportError(String),
352    /// Application-level error code.
353    ApplicationError(u64),
354    /// Connection was reset.
355    Reset,
356}
357
358/// Events emitted by the link transport layer.
359///
360/// These events notify the overlay about significant transport-level changes.
361#[derive(Debug, Clone)]
362pub enum LinkEvent {
363    /// A new peer has connected.
364    PeerConnected {
365        /// The connected peer's ID.
366        peer: PeerId,
367        /// Initial capabilities (may be updated later).
368        caps: Capabilities,
369    },
370
371    /// A peer has disconnected.
372    PeerDisconnected {
373        /// The disconnected peer's ID.
374        peer: PeerId,
375        /// Reason for disconnection.
376        reason: DisconnectReason,
377    },
378
379    /// Our observed external address has been updated.
380    ExternalAddressUpdated {
381        /// The new external address.
382        addr: SocketAddr,
383    },
384
385    /// A peer's capabilities have been updated.
386    CapabilityUpdated {
387        /// The peer whose capabilities changed.
388        peer: PeerId,
389        /// Updated capabilities.
390        caps: Capabilities,
391    },
392
393    /// A relay request has been received.
394    RelayRequest {
395        /// Peer requesting the relay.
396        from: PeerId,
397        /// Target peer for the relay.
398        to: PeerId,
399        /// Bytes remaining in relay budget.
400        budget_bytes: u64,
401    },
402
403    /// A NAT traversal coordination request has been received.
404    CoordinationRequest {
405        /// First peer in the coordination.
406        peer_a: PeerId,
407        /// Second peer in the coordination.
408        peer_b: PeerId,
409        /// Coordination round number.
410        round: u64,
411    },
412
413    /// The bootstrap cache has been updated.
414    BootstrapCacheUpdated {
415        /// Number of peers in the cache.
416        peer_count: usize,
417    },
418}
419
420// ============================================================================
421// Link Transport Errors
422// ============================================================================
423
424/// Errors that can occur during link transport operations.
425#[derive(Debug, Error, Clone)]
426pub enum LinkError {
427    /// The connection was closed.
428    #[error("connection closed")]
429    ConnectionClosed,
430
431    /// Failed to establish connection.
432    #[error("connection failed: {0}")]
433    ConnectionFailed(String),
434
435    /// The peer is not known/reachable.
436    #[error("peer not found: {0}")]
437    PeerNotFound(String),
438
439    /// Protocol negotiation failed.
440    #[error("protocol not supported: {0}")]
441    ProtocolNotSupported(ProtocolId),
442
443    /// A timeout occurred.
444    #[error("operation timed out")]
445    Timeout,
446
447    /// The stream was reset by the peer.
448    #[error("stream reset: error code {0}")]
449    StreamReset(u64),
450
451    /// An I/O error occurred.
452    #[error("I/O error: {0}")]
453    Io(String),
454
455    /// The transport is shutting down.
456    #[error("transport shutdown")]
457    Shutdown,
458
459    /// Rate limit exceeded.
460    #[error("rate limit exceeded")]
461    RateLimited,
462
463    /// Internal error.
464    #[error("internal error: {0}")]
465    Internal(String),
466}
467
468impl From<std::io::Error> for LinkError {
469    fn from(e: std::io::Error) -> Self {
470        Self::Io(e.to_string())
471    }
472}
473
474/// Result type for link transport operations.
475pub type LinkResult<T> = Result<T, LinkError>;
476
477// ============================================================================
478// Link Connection Trait
479// ============================================================================
480
481/// A boxed future for async operations.
482pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
483
484/// A boxed stream for async iteration.
485pub type BoxStream<'a, T> = Pin<Box<dyn futures_util::Stream<Item = T> + Send + 'a>>;
486
487/// A connection to a remote peer.
488///
489/// This trait abstracts a single QUIC connection, providing methods to
490/// open streams and send/receive datagrams.
491pub trait LinkConn: Send + Sync {
492    /// Get the remote peer's ID.
493    fn peer(&self) -> PeerId;
494
495    /// Get the remote peer's address.
496    fn remote_addr(&self) -> SocketAddr;
497
498    /// Open a unidirectional stream (send only).
499    fn open_uni(&self) -> BoxFuture<'_, LinkResult<Box<dyn LinkSendStream>>>;
500
501    /// Open a bidirectional stream.
502    fn open_bi(&self) -> BoxFuture<'_, LinkResult<(Box<dyn LinkSendStream>, Box<dyn LinkRecvStream>)>>;
503
504    /// Send an unreliable datagram to the peer.
505    fn send_datagram(&self, data: Bytes) -> LinkResult<()>;
506
507    /// Receive datagrams from the peer.
508    fn recv_datagrams(&self) -> BoxStream<'_, Bytes>;
509
510    /// Close the connection with an error code.
511    fn close(&self, error_code: u64, reason: &str);
512
513    /// Check if the connection is still open.
514    fn is_open(&self) -> bool;
515
516    /// Get connection statistics.
517    fn stats(&self) -> ConnectionStats;
518}
519
520/// Statistics for a connection.
521#[derive(Debug, Clone, Default)]
522pub struct ConnectionStats {
523    /// Bytes sent on this connection.
524    pub bytes_sent: u64,
525    /// Bytes received on this connection.
526    pub bytes_received: u64,
527    /// Current round-trip time estimate.
528    pub rtt: Duration,
529    /// Connection uptime.
530    pub connected_duration: Duration,
531    /// Number of streams opened.
532    pub streams_opened: u64,
533    /// Packets lost (estimated).
534    pub packets_lost: u64,
535}
536
537/// A send stream for writing data to a peer.
538pub trait LinkSendStream: Send + Sync {
539    /// Write data to the stream.
540    fn write<'a>(&'a mut self, data: &'a [u8]) -> BoxFuture<'a, LinkResult<usize>>;
541
542    /// Write all data to the stream.
543    fn write_all<'a>(&'a mut self, data: &'a [u8]) -> BoxFuture<'a, LinkResult<()>>;
544
545    /// Finish the stream (signal end of data).
546    fn finish(&mut self) -> LinkResult<()>;
547
548    /// Reset the stream with an error code.
549    fn reset(&mut self, error_code: u64) -> LinkResult<()>;
550
551    /// Get the stream ID.
552    fn id(&self) -> u64;
553}
554
555/// A receive stream for reading data from a peer.
556pub trait LinkRecvStream: Send + Sync {
557    /// Read data from the stream.
558    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> BoxFuture<'a, LinkResult<Option<usize>>>;
559
560    /// Read all data until the stream ends.
561    fn read_to_end(&mut self, size_limit: usize) -> BoxFuture<'_, LinkResult<Vec<u8>>>;
562
563    /// Stop receiving data (signal we don't want more).
564    fn stop(&mut self, error_code: u64) -> LinkResult<()>;
565
566    /// Get the stream ID.
567    fn id(&self) -> u64;
568}
569
570// ============================================================================
571// Link Transport Trait
572// ============================================================================
573
574/// Incoming connection stream.
575pub type Incoming<C> = BoxStream<'static, LinkResult<C>>;
576
577/// The primary transport abstraction for overlay networks.
578///
579/// This trait provides everything an overlay needs to establish connections,
580/// send/receive data, and monitor the transport layer.
581///
582/// # Implementation Notes
583///
584/// Implementors should:
585/// - Handle NAT traversal transparently
586/// - Maintain a peer table with capabilities
587/// - Emit events for connection state changes
588/// - Support protocol multiplexing
589///
590/// # Example Implementation
591///
592/// The default implementation wraps [`P2pEndpoint`]:
593///
594/// ```rust,ignore
595/// let config = P2pConfig::builder()
596///     .bind_addr("0.0.0.0:0".parse()?)
597///     .build()?;
598/// let endpoint = P2pEndpoint::new(config).await?;
599/// let transport: Arc<dyn LinkTransport<Conn = P2pLinkConn>> = Arc::new(endpoint);
600/// ```
601pub trait LinkTransport: Send + Sync + 'static {
602    /// The connection type returned by this transport.
603    type Conn: LinkConn + 'static;
604
605    /// Get our local peer ID.
606    fn local_peer(&self) -> PeerId;
607
608    /// Get our observed external address (if known).
609    fn external_address(&self) -> Option<SocketAddr>;
610
611    /// Get the current peer table with capabilities.
612    ///
613    /// This returns all known peers, including disconnected ones
614    /// that are still in the bootstrap cache.
615    fn peer_table(&self) -> Vec<(PeerId, Capabilities)>;
616
617    /// Get capabilities for a specific peer.
618    fn peer_capabilities(&self, peer: &PeerId) -> Option<Capabilities>;
619
620    /// Subscribe to transport events.
621    fn subscribe(&self) -> broadcast::Receiver<LinkEvent>;
622
623    /// Accept incoming connections for a specific protocol.
624    fn accept(&self, proto: ProtocolId) -> Incoming<Self::Conn>;
625
626    /// Dial a peer to establish a connection.
627    ///
628    /// This may involve NAT traversal, which is handled transparently.
629    fn dial(&self, peer: PeerId, proto: ProtocolId) -> BoxFuture<'_, LinkResult<Self::Conn>>;
630
631    /// Dial a peer by address (for bootstrap).
632    fn dial_addr(&self, addr: SocketAddr, proto: ProtocolId) -> BoxFuture<'_, LinkResult<Self::Conn>>;
633
634    /// Get the list of protocols we support.
635    fn supported_protocols(&self) -> Vec<ProtocolId>;
636
637    /// Register a protocol as supported.
638    fn register_protocol(&self, proto: ProtocolId);
639
640    /// Unregister a protocol.
641    fn unregister_protocol(&self, proto: ProtocolId);
642
643    /// Check if we're connected to a peer.
644    fn is_connected(&self, peer: &PeerId) -> bool;
645
646    /// Get the number of active connections.
647    fn active_connections(&self) -> usize;
648
649    /// Gracefully shutdown the transport.
650    fn shutdown(&self) -> BoxFuture<'_, ()>;
651}
652
653// ============================================================================
654// P2pEndpoint Implementation
655// ============================================================================
656
657// The implementation of LinkTransport for P2pEndpoint is in a separate file
658// to keep this module focused on the trait definitions.
659
660#[cfg(test)]
661mod tests {
662    use super::*;
663
664    #[test]
665    fn test_protocol_id_from_string() {
666        let proto = ProtocolId::from("saorsa-dht/1.0");
667        assert_eq!(&proto.0[..14], b"saorsa-dht/1.0");
668        assert_eq!(proto.0[14], 0);
669        assert_eq!(proto.0[15], 0);
670    }
671
672    #[test]
673    fn test_protocol_id_truncation() {
674        let proto = ProtocolId::from("this-is-a-very-long-protocol-name");
675        assert_eq!(&proto.0, b"this-is-a-very-l");
676    }
677
678    #[test]
679    fn test_protocol_id_display() {
680        let proto = ProtocolId::from("test/1.0");
681        assert_eq!(format!("{}", proto), "test/1.0");
682    }
683
684    #[test]
685    fn test_capabilities_quality_score() {
686        let mut caps = Capabilities::default();
687
688        // Default has perfect RTT (0ms) and no packet loss, so score should be high
689        // Score = 0.5 (base) + 0.3 (RTT: 1.0*0.3) + 0.2 (loss: 1.0*0.2) = 1.0
690        let base_score = caps.quality_score();
691        assert!((0.9..=1.0).contains(&base_score), "base_score = {}", base_score);
692
693        // Worse RTT should reduce score
694        caps.rtt_ms_p50 = 150; // 50% of max
695        let worse_rtt_score = caps.quality_score();
696        assert!(worse_rtt_score < base_score, "worse RTT should reduce score");
697
698        // Very bad RTT should reduce score more
699        caps.rtt_ms_p50 = 500;
700        let bad_rtt_score = caps.quality_score();
701        assert!(bad_rtt_score < worse_rtt_score, "bad RTT should reduce score more");
702
703        // Symmetric NAT should reduce score
704        caps.rtt_ms_p50 = 50;
705        caps.nat_type_hint = Some(NatHint::Symmetric);
706        let nat_score = caps.quality_score();
707        // Reset RTT for fair comparison
708        caps.nat_type_hint = None;
709        caps.rtt_ms_p50 = 50;
710        let no_nat_score = caps.quality_score();
711        assert!(nat_score < no_nat_score, "symmetric NAT should reduce score");
712    }
713
714    #[test]
715    fn test_capabilities_supports_protocol() {
716        let mut caps = Capabilities::default();
717        let dht = ProtocolId::from("dht/1.0");
718        let gossip = ProtocolId::from("gossip/1.0");
719        
720        caps.protocols.push(dht);
721        
722        assert!(caps.supports_protocol(&dht));
723        assert!(!caps.supports_protocol(&gossip));
724    }
725}