ant_quic/link_transport.rs
1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! # Link Transport Abstraction Layer
9//!
10//! This module provides the [`LinkTransport`] and [`LinkConn`] traits that abstract
11//! the transport layer for overlay networks like saorsa-core. This enables:
12//!
13//! - **Version decoupling**: Overlays can compile against a stable trait interface
14//! while ant-quic evolves underneath
15//! - **Testing**: Mock transports for unit testing overlay logic
16//! - **Alternative transports**: Future support for WebRTC, TCP fallback, etc.
17//!
18//! ## Architecture
19//!
20//! ```text
21//! ┌─────────────────────────────────────────────────────────────────┐
22//! │ saorsa-core (overlay) │
23//! │ DHT routing │ Record storage │ Greedy routing │ Naming │
24//! └─────────────────────────────────────────────────────────────────┘
25//! │
26//! ▼
27//! ┌─────────────────────────────────────────────────────────────────┐
28//! │ LinkTransport trait │
29//! │ local_peer() │ peer_table() │ dial() │ accept() │ subscribe() │
30//! └─────────────────────────────────────────────────────────────────┘
31//! │
32//! ▼
33//! ┌─────────────────────────────────────────────────────────────────┐
34//! │ ant-quic P2pEndpoint │
35//! │ QUIC transport │ NAT traversal │ PQC │ Connection management │
36//! └─────────────────────────────────────────────────────────────────┘
37//! ```
38//!
39//! ## Example
40//!
41//! ```rust,ignore
42//! use ant_quic::link_transport::{LinkTransport, LinkConn, ProtocolId};
43//!
44//! // Define your overlay's protocol
45//! const DHT_PROTOCOL: ProtocolId = ProtocolId::from_static(b"saorsa-dht/1.0.0");
46//!
47//! async fn run_overlay<T: LinkTransport>(transport: T) -> anyhow::Result<()> {
48//! // Accept incoming connections for our protocol
49//! let mut incoming = transport.accept(DHT_PROTOCOL);
50//!
51//! // Dial a peer
52//! let peer_id = /* ... */;
53//! let conn = transport.dial(peer_id, DHT_PROTOCOL).await?;
54//!
55//! // Open a bidirectional stream
56//! let (send, recv) = conn.open_bi().await?;
57//!
58//! Ok(())
59//! }
60//! ```
61
62use std::fmt;
63use std::future::Future;
64use std::net::SocketAddr;
65use std::pin::Pin;
66use std::sync::Arc;
67use std::time::{Duration, Instant, SystemTime};
68
69use bytes::Bytes;
70use thiserror::Error;
71use tokio::sync::broadcast;
72
73use crate::nat_traversal_api::PeerId;
74
75// ============================================================================
76// Protocol Identifier
77// ============================================================================
78
79/// Protocol identifier for multiplexing multiple overlays on a single transport.
80///
81/// Protocols are identified by a 16-byte value, allowing efficient binary comparison
82/// while supporting human-readable names during debugging.
83///
84/// # Examples
85///
86/// ```rust
87/// use ant_quic::link_transport::ProtocolId;
88///
89/// // From a static string (padded/truncated to 16 bytes)
90/// const DHT: ProtocolId = ProtocolId::from_static(b"saorsa-dht/1.0.0");
91///
92/// // From bytes
93/// let proto = ProtocolId::new([0x73, 0x61, 0x6f, 0x72, 0x73, 0x61, 0x2d, 0x64,
94/// 0x68, 0x74, 0x2f, 0x31, 0x2e, 0x30, 0x2e, 0x30]);
95/// ```
96#[derive(Clone, Copy, PartialEq, Eq, Hash)]
97pub struct ProtocolId(pub [u8; 16]);
98
99impl ProtocolId {
100 /// Create a new protocol ID from raw bytes.
101 #[inline]
102 pub const fn new(bytes: [u8; 16]) -> Self {
103 Self(bytes)
104 }
105
106 /// Create a protocol ID from a static byte string.
107 ///
108 /// The string is padded with zeros if shorter than 16 bytes,
109 /// or truncated if longer.
110 #[inline]
111 pub const fn from_static(s: &[u8]) -> Self {
112 let mut bytes = [0u8; 16];
113 let len = if s.len() < 16 { s.len() } else { 16 };
114 let mut i = 0;
115 while i < len {
116 bytes[i] = s[i];
117 i += 1;
118 }
119 Self(bytes)
120 }
121
122 /// Get the raw bytes of this protocol ID.
123 #[inline]
124 pub const fn as_bytes(&self) -> &[u8; 16] {
125 &self.0
126 }
127
128 /// The default protocol for connections without explicit protocol negotiation.
129 pub const DEFAULT: Self = Self::from_static(b"ant-quic/default");
130
131 /// Protocol ID for NAT traversal coordination messages.
132 pub const NAT_TRAVERSAL: Self = Self::from_static(b"ant-quic/nat");
133
134 /// Protocol ID for relay traffic.
135 pub const RELAY: Self = Self::from_static(b"ant-quic/relay");
136}
137
138impl Default for ProtocolId {
139 fn default() -> Self {
140 Self::DEFAULT
141 }
142}
143
144impl fmt::Debug for ProtocolId {
145 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146 // Try to display as UTF-8 string, trimming null bytes
147 let end = self.0.iter().position(|&b| b == 0).unwrap_or(16);
148 if let Ok(s) = std::str::from_utf8(&self.0[..end]) {
149 write!(f, "ProtocolId({:?})", s)
150 } else {
151 write!(f, "ProtocolId({:?})", hex::encode(self.0))
152 }
153 }
154}
155
156impl fmt::Display for ProtocolId {
157 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158 let end = self.0.iter().position(|&b| b == 0).unwrap_or(16);
159 if let Ok(s) = std::str::from_utf8(&self.0[..end]) {
160 write!(f, "{}", s)
161 } else {
162 write!(f, "{}", hex::encode(self.0))
163 }
164 }
165}
166
167impl From<&str> for ProtocolId {
168 fn from(s: &str) -> Self {
169 Self::from_static(s.as_bytes())
170 }
171}
172
173impl From<[u8; 16]> for ProtocolId {
174 fn from(bytes: [u8; 16]) -> Self {
175 Self(bytes)
176 }
177}
178
179// ============================================================================
180// Peer Capabilities
181// ============================================================================
182
183/// NAT type classification hint for connection strategy selection.
184#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
185pub enum NatHint {
186 /// No NAT detected (public IP, direct connectivity)
187 None,
188 /// Full cone NAT (easiest to traverse)
189 FullCone,
190 /// Address-restricted cone NAT
191 AddressRestrictedCone,
192 /// Port-restricted cone NAT
193 PortRestrictedCone,
194 /// Symmetric NAT (hardest to traverse, may require relay)
195 Symmetric,
196 /// Unknown NAT type
197 Unknown,
198}
199
200impl Default for NatHint {
201 fn default() -> Self {
202 Self::Unknown
203 }
204}
205
206/// Capabilities and quality metrics for a connected peer.
207///
208/// This struct captures both static capabilities (what the peer can do)
209/// and dynamic metrics (how well the peer is performing).
210#[derive(Debug, Clone)]
211pub struct Capabilities {
212 /// Whether this peer can relay traffic for NAT traversal.
213 pub supports_relay: bool,
214
215 /// Whether this peer can coordinate NAT hole-punching.
216 pub supports_coordination: bool,
217
218 /// Observed external addresses for this peer.
219 pub observed_addrs: Vec<SocketAddr>,
220
221 /// Protocols this peer advertises support for.
222 pub protocols: Vec<ProtocolId>,
223
224 /// Last time we successfully communicated with this peer.
225 pub last_seen: SystemTime,
226
227 /// Median round-trip time in milliseconds (p50).
228 pub rtt_ms_p50: u32,
229
230 /// Estimated RTT jitter in milliseconds.
231 pub rtt_jitter_ms: u32,
232
233 /// Packet loss rate (0.0 to 1.0).
234 pub packet_loss: f32,
235
236 /// Inferred NAT type for connection strategy hints.
237 pub nat_type_hint: Option<NatHint>,
238
239 /// Peer's advertised bandwidth limit (bytes/sec), if any.
240 pub bandwidth_limit: Option<u64>,
241
242 /// Number of successful connections to this peer.
243 pub successful_connections: u32,
244
245 /// Number of failed connection attempts to this peer.
246 pub failed_connections: u32,
247
248 /// Whether this peer is currently connected.
249 pub is_connected: bool,
250}
251
252impl Default for Capabilities {
253 fn default() -> Self {
254 Self {
255 supports_relay: false,
256 supports_coordination: false,
257 observed_addrs: Vec::new(),
258 protocols: Vec::new(),
259 last_seen: SystemTime::UNIX_EPOCH,
260 rtt_ms_p50: 0,
261 rtt_jitter_ms: 0,
262 packet_loss: 0.0,
263 nat_type_hint: None,
264 bandwidth_limit: None,
265 successful_connections: 0,
266 failed_connections: 0,
267 is_connected: false,
268 }
269 }
270}
271
272impl Capabilities {
273 /// Create capabilities for a newly connected peer.
274 pub fn new_connected(addr: SocketAddr) -> Self {
275 Self {
276 observed_addrs: vec![addr],
277 last_seen: SystemTime::now(),
278 is_connected: true,
279 ..Default::default()
280 }
281 }
282
283 /// Calculate a quality score for peer selection (0.0 to 1.0).
284 ///
285 /// Higher scores indicate better peers for connection.
286 pub fn quality_score(&self) -> f32 {
287 let mut score = 0.5; // Base score
288
289 // RTT component (lower is better, max 300ms considered)
290 let rtt_score = 1.0 - (self.rtt_ms_p50 as f32 / 300.0).min(1.0);
291 score += rtt_score * 0.3;
292
293 // Packet loss component
294 let loss_score = 1.0 - self.packet_loss;
295 score += loss_score * 0.2;
296
297 // Connection success rate
298 let total = self.successful_connections + self.failed_connections;
299 if total > 0 {
300 let success_rate = self.successful_connections as f32 / total as f32;
301 score += success_rate * 0.2;
302 }
303
304 // Capability bonus
305 if self.supports_relay {
306 score += 0.05;
307 }
308 if self.supports_coordination {
309 score += 0.05;
310 }
311
312 // NAT type penalty
313 if let Some(nat) = self.nat_type_hint {
314 match nat {
315 NatHint::None | NatHint::FullCone => {}
316 NatHint::AddressRestrictedCone | NatHint::PortRestrictedCone => {
317 score -= 0.05;
318 }
319 NatHint::Symmetric => {
320 score -= 0.15;
321 }
322 NatHint::Unknown => {
323 score -= 0.02;
324 }
325 }
326 }
327
328 score.clamp(0.0, 1.0)
329 }
330
331 /// Check if this peer supports a specific protocol.
332 pub fn supports_protocol(&self, proto: &ProtocolId) -> bool {
333 self.protocols.contains(proto)
334 }
335}
336
337// ============================================================================
338// Link Events
339// ============================================================================
340
341/// Reason for peer disconnection.
342#[derive(Debug, Clone, PartialEq, Eq)]
343pub enum DisconnectReason {
344 /// Clean shutdown initiated by local side.
345 LocalClose,
346 /// Clean shutdown initiated by remote side.
347 RemoteClose,
348 /// Connection timed out.
349 Timeout,
350 /// Transport error occurred.
351 TransportError(String),
352 /// Application-level error code.
353 ApplicationError(u64),
354 /// Connection was reset.
355 Reset,
356}
357
358/// Events emitted by the link transport layer.
359///
360/// These events notify the overlay about significant transport-level changes.
361#[derive(Debug, Clone)]
362pub enum LinkEvent {
363 /// A new peer has connected.
364 PeerConnected {
365 /// The connected peer's ID.
366 peer: PeerId,
367 /// Initial capabilities (may be updated later).
368 caps: Capabilities,
369 },
370
371 /// A peer has disconnected.
372 PeerDisconnected {
373 /// The disconnected peer's ID.
374 peer: PeerId,
375 /// Reason for disconnection.
376 reason: DisconnectReason,
377 },
378
379 /// Our observed external address has been updated.
380 ExternalAddressUpdated {
381 /// The new external address.
382 addr: SocketAddr,
383 },
384
385 /// A peer's capabilities have been updated.
386 CapabilityUpdated {
387 /// The peer whose capabilities changed.
388 peer: PeerId,
389 /// Updated capabilities.
390 caps: Capabilities,
391 },
392
393 /// A relay request has been received.
394 RelayRequest {
395 /// Peer requesting the relay.
396 from: PeerId,
397 /// Target peer for the relay.
398 to: PeerId,
399 /// Bytes remaining in relay budget.
400 budget_bytes: u64,
401 },
402
403 /// A NAT traversal coordination request has been received.
404 CoordinationRequest {
405 /// First peer in the coordination.
406 peer_a: PeerId,
407 /// Second peer in the coordination.
408 peer_b: PeerId,
409 /// Coordination round number.
410 round: u64,
411 },
412
413 /// The bootstrap cache has been updated.
414 BootstrapCacheUpdated {
415 /// Number of peers in the cache.
416 peer_count: usize,
417 },
418}
419
420// ============================================================================
421// Link Transport Errors
422// ============================================================================
423
424/// Errors that can occur during link transport operations.
425#[derive(Debug, Error, Clone)]
426pub enum LinkError {
427 /// The connection was closed.
428 #[error("connection closed")]
429 ConnectionClosed,
430
431 /// Failed to establish connection.
432 #[error("connection failed: {0}")]
433 ConnectionFailed(String),
434
435 /// The peer is not known/reachable.
436 #[error("peer not found: {0}")]
437 PeerNotFound(String),
438
439 /// Protocol negotiation failed.
440 #[error("protocol not supported: {0}")]
441 ProtocolNotSupported(ProtocolId),
442
443 /// A timeout occurred.
444 #[error("operation timed out")]
445 Timeout,
446
447 /// The stream was reset by the peer.
448 #[error("stream reset: error code {0}")]
449 StreamReset(u64),
450
451 /// An I/O error occurred.
452 #[error("I/O error: {0}")]
453 Io(String),
454
455 /// The transport is shutting down.
456 #[error("transport shutdown")]
457 Shutdown,
458
459 /// Rate limit exceeded.
460 #[error("rate limit exceeded")]
461 RateLimited,
462
463 /// Internal error.
464 #[error("internal error: {0}")]
465 Internal(String),
466}
467
468impl From<std::io::Error> for LinkError {
469 fn from(e: std::io::Error) -> Self {
470 Self::Io(e.to_string())
471 }
472}
473
474/// Result type for link transport operations.
475pub type LinkResult<T> = Result<T, LinkError>;
476
477// ============================================================================
478// Link Connection Trait
479// ============================================================================
480
481/// A boxed future for async operations.
482pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
483
484/// A boxed stream for async iteration.
485pub type BoxStream<'a, T> = Pin<Box<dyn futures_util::Stream<Item = T> + Send + 'a>>;
486
487/// A connection to a remote peer.
488///
489/// This trait abstracts a single QUIC connection, providing methods to
490/// open streams and send/receive datagrams.
491pub trait LinkConn: Send + Sync {
492 /// Get the remote peer's ID.
493 fn peer(&self) -> PeerId;
494
495 /// Get the remote peer's address.
496 fn remote_addr(&self) -> SocketAddr;
497
498 /// Open a unidirectional stream (send only).
499 fn open_uni(&self) -> BoxFuture<'_, LinkResult<Box<dyn LinkSendStream>>>;
500
501 /// Open a bidirectional stream.
502 fn open_bi(&self) -> BoxFuture<'_, LinkResult<(Box<dyn LinkSendStream>, Box<dyn LinkRecvStream>)>>;
503
504 /// Send an unreliable datagram to the peer.
505 fn send_datagram(&self, data: Bytes) -> LinkResult<()>;
506
507 /// Receive datagrams from the peer.
508 fn recv_datagrams(&self) -> BoxStream<'_, Bytes>;
509
510 /// Close the connection with an error code.
511 fn close(&self, error_code: u64, reason: &str);
512
513 /// Check if the connection is still open.
514 fn is_open(&self) -> bool;
515
516 /// Get connection statistics.
517 fn stats(&self) -> ConnectionStats;
518}
519
520/// Statistics for a connection.
521#[derive(Debug, Clone, Default)]
522pub struct ConnectionStats {
523 /// Bytes sent on this connection.
524 pub bytes_sent: u64,
525 /// Bytes received on this connection.
526 pub bytes_received: u64,
527 /// Current round-trip time estimate.
528 pub rtt: Duration,
529 /// Connection uptime.
530 pub connected_duration: Duration,
531 /// Number of streams opened.
532 pub streams_opened: u64,
533 /// Packets lost (estimated).
534 pub packets_lost: u64,
535}
536
537/// A send stream for writing data to a peer.
538pub trait LinkSendStream: Send + Sync {
539 /// Write data to the stream.
540 fn write<'a>(&'a mut self, data: &'a [u8]) -> BoxFuture<'a, LinkResult<usize>>;
541
542 /// Write all data to the stream.
543 fn write_all<'a>(&'a mut self, data: &'a [u8]) -> BoxFuture<'a, LinkResult<()>>;
544
545 /// Finish the stream (signal end of data).
546 fn finish(&mut self) -> LinkResult<()>;
547
548 /// Reset the stream with an error code.
549 fn reset(&mut self, error_code: u64) -> LinkResult<()>;
550
551 /// Get the stream ID.
552 fn id(&self) -> u64;
553}
554
555/// A receive stream for reading data from a peer.
556pub trait LinkRecvStream: Send + Sync {
557 /// Read data from the stream.
558 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> BoxFuture<'a, LinkResult<Option<usize>>>;
559
560 /// Read all data until the stream ends.
561 fn read_to_end(&mut self, size_limit: usize) -> BoxFuture<'_, LinkResult<Vec<u8>>>;
562
563 /// Stop receiving data (signal we don't want more).
564 fn stop(&mut self, error_code: u64) -> LinkResult<()>;
565
566 /// Get the stream ID.
567 fn id(&self) -> u64;
568}
569
570// ============================================================================
571// Link Transport Trait
572// ============================================================================
573
574/// Incoming connection stream.
575pub type Incoming<C> = BoxStream<'static, LinkResult<C>>;
576
577/// The primary transport abstraction for overlay networks.
578///
579/// This trait provides everything an overlay needs to establish connections,
580/// send/receive data, and monitor the transport layer.
581///
582/// # Implementation Notes
583///
584/// Implementors should:
585/// - Handle NAT traversal transparently
586/// - Maintain a peer table with capabilities
587/// - Emit events for connection state changes
588/// - Support protocol multiplexing
589///
590/// # Example Implementation
591///
592/// The default implementation wraps [`P2pEndpoint`]:
593///
594/// ```rust,ignore
595/// let config = P2pConfig::builder()
596/// .bind_addr("0.0.0.0:0".parse()?)
597/// .build()?;
598/// let endpoint = P2pEndpoint::new(config).await?;
599/// let transport: Arc<dyn LinkTransport<Conn = P2pLinkConn>> = Arc::new(endpoint);
600/// ```
601pub trait LinkTransport: Send + Sync + 'static {
602 /// The connection type returned by this transport.
603 type Conn: LinkConn + 'static;
604
605 /// Get our local peer ID.
606 fn local_peer(&self) -> PeerId;
607
608 /// Get our observed external address (if known).
609 fn external_address(&self) -> Option<SocketAddr>;
610
611 /// Get the current peer table with capabilities.
612 ///
613 /// This returns all known peers, including disconnected ones
614 /// that are still in the bootstrap cache.
615 fn peer_table(&self) -> Vec<(PeerId, Capabilities)>;
616
617 /// Get capabilities for a specific peer.
618 fn peer_capabilities(&self, peer: &PeerId) -> Option<Capabilities>;
619
620 /// Subscribe to transport events.
621 fn subscribe(&self) -> broadcast::Receiver<LinkEvent>;
622
623 /// Accept incoming connections for a specific protocol.
624 fn accept(&self, proto: ProtocolId) -> Incoming<Self::Conn>;
625
626 /// Dial a peer to establish a connection.
627 ///
628 /// This may involve NAT traversal, which is handled transparently.
629 fn dial(&self, peer: PeerId, proto: ProtocolId) -> BoxFuture<'_, LinkResult<Self::Conn>>;
630
631 /// Dial a peer by address (for bootstrap).
632 fn dial_addr(&self, addr: SocketAddr, proto: ProtocolId) -> BoxFuture<'_, LinkResult<Self::Conn>>;
633
634 /// Get the list of protocols we support.
635 fn supported_protocols(&self) -> Vec<ProtocolId>;
636
637 /// Register a protocol as supported.
638 fn register_protocol(&self, proto: ProtocolId);
639
640 /// Unregister a protocol.
641 fn unregister_protocol(&self, proto: ProtocolId);
642
643 /// Check if we're connected to a peer.
644 fn is_connected(&self, peer: &PeerId) -> bool;
645
646 /// Get the number of active connections.
647 fn active_connections(&self) -> usize;
648
649 /// Gracefully shutdown the transport.
650 fn shutdown(&self) -> BoxFuture<'_, ()>;
651}
652
653// ============================================================================
654// P2pEndpoint Implementation
655// ============================================================================
656
657// The implementation of LinkTransport for P2pEndpoint is in a separate file
658// to keep this module focused on the trait definitions.
659
660#[cfg(test)]
661mod tests {
662 use super::*;
663
664 #[test]
665 fn test_protocol_id_from_string() {
666 let proto = ProtocolId::from("saorsa-dht/1.0");
667 assert_eq!(&proto.0[..14], b"saorsa-dht/1.0");
668 assert_eq!(proto.0[14], 0);
669 assert_eq!(proto.0[15], 0);
670 }
671
672 #[test]
673 fn test_protocol_id_truncation() {
674 let proto = ProtocolId::from("this-is-a-very-long-protocol-name");
675 assert_eq!(&proto.0, b"this-is-a-very-l");
676 }
677
678 #[test]
679 fn test_protocol_id_display() {
680 let proto = ProtocolId::from("test/1.0");
681 assert_eq!(format!("{}", proto), "test/1.0");
682 }
683
684 #[test]
685 fn test_capabilities_quality_score() {
686 let mut caps = Capabilities::default();
687
688 // Default has perfect RTT (0ms) and no packet loss, so score should be high
689 // Score = 0.5 (base) + 0.3 (RTT: 1.0*0.3) + 0.2 (loss: 1.0*0.2) = 1.0
690 let base_score = caps.quality_score();
691 assert!((0.9..=1.0).contains(&base_score), "base_score = {}", base_score);
692
693 // Worse RTT should reduce score
694 caps.rtt_ms_p50 = 150; // 50% of max
695 let worse_rtt_score = caps.quality_score();
696 assert!(worse_rtt_score < base_score, "worse RTT should reduce score");
697
698 // Very bad RTT should reduce score more
699 caps.rtt_ms_p50 = 500;
700 let bad_rtt_score = caps.quality_score();
701 assert!(bad_rtt_score < worse_rtt_score, "bad RTT should reduce score more");
702
703 // Symmetric NAT should reduce score
704 caps.rtt_ms_p50 = 50;
705 caps.nat_type_hint = Some(NatHint::Symmetric);
706 let nat_score = caps.quality_score();
707 // Reset RTT for fair comparison
708 caps.nat_type_hint = None;
709 caps.rtt_ms_p50 = 50;
710 let no_nat_score = caps.quality_score();
711 assert!(nat_score < no_nat_score, "symmetric NAT should reduce score");
712 }
713
714 #[test]
715 fn test_capabilities_supports_protocol() {
716 let mut caps = Capabilities::default();
717 let dht = ProtocolId::from("dht/1.0");
718 let gossip = ProtocolId::from("gossip/1.0");
719
720 caps.protocols.push(dht);
721
722 assert!(caps.supports_protocol(&dht));
723 assert!(!caps.supports_protocol(&gossip));
724 }
725}