ant_quic/
link_transport.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! # Link Transport Abstraction Layer
9//!
10//! This module provides the [`LinkTransport`] and [`LinkConn`] traits that abstract
11//! the transport layer for overlay networks like saorsa-core. This enables:
12//!
13//! - **Version decoupling**: Overlays can compile against a stable trait interface
14//!   while ant-quic evolves underneath
15//! - **Testing**: Mock transports for unit testing overlay logic
16//! - **Alternative transports**: Future support for WebRTC, TCP fallback, etc.
17//!
18//! ## Architecture
19//!
20//! ```text
21//! ┌─────────────────────────────────────────────────────────────────┐
22//! │                      saorsa-core (overlay)                       │
23//! │  DHT routing │ Record storage │ Greedy routing │ Naming         │
24//! └─────────────────────────────────────────────────────────────────┘
25//!                              │
26//!                              ▼
27//! ┌─────────────────────────────────────────────────────────────────┐
28//! │                    LinkTransport trait                          │
29//! │  local_peer() │ peer_table() │ dial() │ accept() │ subscribe()  │
30//! └─────────────────────────────────────────────────────────────────┘
31//!                              │
32//!                              ▼
33//! ┌─────────────────────────────────────────────────────────────────┐
34//! │                  ant-quic P2pEndpoint                            │
35//! │  QUIC transport │ NAT traversal │ PQC │ Connection management   │
36//! └─────────────────────────────────────────────────────────────────┘
37//! ```
38//!
39//! ## Example: Implementing an Overlay
40//!
41//! ```rust,ignore
42//! use ant_quic::link_transport::{LinkTransport, LinkConn, LinkEvent, ProtocolId, LinkError};
43//! use std::sync::Arc;
44//! use futures_util::StreamExt;
45//!
46//! // Define your overlay's protocol identifier
47//! const DHT_PROTOCOL: ProtocolId = ProtocolId::from_static(b"saorsa-dht/1.0.0");
48//!
49//! async fn run_overlay<T: LinkTransport>(transport: Arc<T>) -> anyhow::Result<()> {
50//!     // Register our protocol so peers know we support it
51//!     transport.register_protocol(DHT_PROTOCOL);
52//!
53//!     // Subscribe to transport events for connection lifecycle
54//!     let mut events = transport.subscribe();
55//!     tokio::spawn(async move {
56//!         while let Ok(event) = events.recv().await {
57//!             match event {
58//!                 LinkEvent::PeerConnected { peer, caps } => {
59//!                     println!("New peer: {:?}, relay: {}", peer, caps.supports_relay);
60//!                 }
61//!                 LinkEvent::PeerDisconnected { peer, reason } => {
62//!                     println!("Lost peer: {:?}, reason: {:?}", peer, reason);
63//!                 }
64//!                 _ => {}
65//!             }
66//!         }
67//!     });
68//!
69//!     // Accept incoming connections in a background task
70//!     let transport_clone = transport.clone();
71//!     tokio::spawn(async move {
72//!         let mut incoming = transport_clone.accept(DHT_PROTOCOL);
73//!         while let Some(result) = incoming.next().await {
74//!             match result {
75//!                 Ok(conn) => {
76//!                     println!("Accepted connection from {:?}", conn.peer());
77//!                     // Handle connection...
78//!                 }
79//!                 Err(e) => eprintln!("Accept error: {}", e),
80//!             }
81//!         }
82//!     });
83//!
84//!     // Dial a peer using their PeerId (NAT traversal handled automatically)
85//!     let peers = transport.peer_table();
86//!     if let Some((peer_id, caps)) = peers.first() {
87//!         match transport.dial(*peer_id, DHT_PROTOCOL).await {
88//!             Ok(conn) => {
89//!                 // Open a bidirectional stream for request/response
90//!                 let (mut send, mut recv) = conn.open_bi().await?;
91//!                 send.write_all(b"PING").await?;
92//!                 send.finish()?;
93//!
94//!                 let response = recv.read_to_end(1024).await?;
95//!                 println!("Response: {:?}", response);
96//!             }
97//!             Err(LinkError::PeerNotFound(_)) => {
98//!                 println!("Peer not in table - need to bootstrap");
99//!             }
100//!             Err(e) => eprintln!("Dial failed: {}", e),
101//!         }
102//!     }
103//!
104//!     Ok(())
105//! }
106//! ```
107//!
108//! ## Choosing Stream Types
109//!
110//! - **Bidirectional (`open_bi`)**: Use for request/response patterns where both
111//!   sides send and receive. Example: RPC calls, file transfers with acknowledgment.
112//!
113//! - **Unidirectional (`open_uni`)**: Use for one-way messages where no response
114//!   is needed. Example: event notifications, log streaming, pub/sub.
115//!
116//! - **Datagrams (`send_datagram`)**: Use for small, unreliable messages where
117//!   latency matters more than reliability. Example: heartbeats, real-time metrics.
118//!
119//! ## Error Handling Patterns
120//!
121//! ```rust,ignore
122//! use ant_quic::link_transport::{LinkError, LinkResult};
123//!
124//! async fn connect_with_retry<T: LinkTransport>(
125//!     transport: &T,
126//!     peer: PeerId,
127//!     proto: ProtocolId,
128//! ) -> LinkResult<T::Conn> {
129//!     for attempt in 1..=3 {
130//!         match transport.dial(peer, proto).await {
131//!             Ok(conn) => return Ok(conn),
132//!             Err(LinkError::PeerNotFound(_)) => {
133//!                 // Peer not in table - can't retry, need bootstrap
134//!                 return Err(LinkError::PeerNotFound(format!("{:?}", peer)));
135//!             }
136//!             Err(LinkError::ConnectionFailed(msg)) if attempt < 3 => {
137//!                 // Transient failure - retry after delay
138//!                 tokio::time::sleep(Duration::from_millis(100 * attempt as u64)).await;
139//!                 continue;
140//!             }
141//!             Err(LinkError::Timeout) if attempt < 3 => {
142//!                 // NAT traversal may need multiple attempts
143//!                 continue;
144//!             }
145//!             Err(e) => return Err(e),
146//!         }
147//!     }
148//!     Err(LinkError::ConnectionFailed("max retries exceeded".into()))
149//! }
150//! ```
151
152use std::fmt;
153use std::future::Future;
154use std::net::SocketAddr;
155use std::pin::Pin;
156use std::sync::Arc;
157use std::time::{Duration, Instant, SystemTime};
158
159use bytes::Bytes;
160use thiserror::Error;
161use tokio::sync::broadcast;
162
163use crate::nat_traversal_api::PeerId;
164
165// ============================================================================
166// Protocol Identifier
167// ============================================================================
168
169/// Protocol identifier for multiplexing multiple overlays on a single transport.
170///
171/// Protocols are identified by a 16-byte value, allowing efficient binary comparison
172/// while supporting human-readable names during debugging.
173///
174/// # Examples
175///
176/// ```rust
177/// use ant_quic::link_transport::ProtocolId;
178///
179/// // From a static string (padded/truncated to 16 bytes)
180/// const DHT: ProtocolId = ProtocolId::from_static(b"saorsa-dht/1.0.0");
181///
182/// // From bytes
183/// let proto = ProtocolId::new([0x73, 0x61, 0x6f, 0x72, 0x73, 0x61, 0x2d, 0x64,
184///                              0x68, 0x74, 0x2f, 0x31, 0x2e, 0x30, 0x2e, 0x30]);
185/// ```
186#[derive(Clone, Copy, PartialEq, Eq, Hash)]
187pub struct ProtocolId(pub [u8; 16]);
188
189impl ProtocolId {
190    /// Create a new protocol ID from raw bytes.
191    #[inline]
192    pub const fn new(bytes: [u8; 16]) -> Self {
193        Self(bytes)
194    }
195
196    /// Create a protocol ID from a static byte string.
197    ///
198    /// The string is padded with zeros if shorter than 16 bytes,
199    /// or truncated if longer.
200    #[inline]
201    pub const fn from_static(s: &[u8]) -> Self {
202        let mut bytes = [0u8; 16];
203        let len = if s.len() < 16 { s.len() } else { 16 };
204        let mut i = 0;
205        while i < len {
206            bytes[i] = s[i];
207            i += 1;
208        }
209        Self(bytes)
210    }
211
212    /// Get the raw bytes of this protocol ID.
213    #[inline]
214    pub const fn as_bytes(&self) -> &[u8; 16] {
215        &self.0
216    }
217
218    /// The default protocol for connections without explicit protocol negotiation.
219    pub const DEFAULT: Self = Self::from_static(b"ant-quic/default");
220
221    /// Protocol ID for NAT traversal coordination messages.
222    pub const NAT_TRAVERSAL: Self = Self::from_static(b"ant-quic/nat");
223
224    /// Protocol ID for relay traffic.
225    pub const RELAY: Self = Self::from_static(b"ant-quic/relay");
226}
227
228impl Default for ProtocolId {
229    fn default() -> Self {
230        Self::DEFAULT
231    }
232}
233
234impl fmt::Debug for ProtocolId {
235    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236        // Try to display as UTF-8 string, trimming null bytes
237        let end = self.0.iter().position(|&b| b == 0).unwrap_or(16);
238        if let Ok(s) = std::str::from_utf8(&self.0[..end]) {
239            write!(f, "ProtocolId({:?})", s)
240        } else {
241            write!(f, "ProtocolId({:?})", hex::encode(self.0))
242        }
243    }
244}
245
246impl fmt::Display for ProtocolId {
247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248        let end = self.0.iter().position(|&b| b == 0).unwrap_or(16);
249        if let Ok(s) = std::str::from_utf8(&self.0[..end]) {
250            write!(f, "{}", s)
251        } else {
252            write!(f, "{}", hex::encode(self.0))
253        }
254    }
255}
256
257impl From<&str> for ProtocolId {
258    fn from(s: &str) -> Self {
259        Self::from_static(s.as_bytes())
260    }
261}
262
263impl From<[u8; 16]> for ProtocolId {
264    fn from(bytes: [u8; 16]) -> Self {
265        Self(bytes)
266    }
267}
268
269// ============================================================================
270// Peer Capabilities
271// ============================================================================
272
273/// NAT type classification hint for connection strategy selection.
274#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
275pub enum NatHint {
276    /// No NAT detected (public IP, direct connectivity)
277    None,
278    /// Full cone NAT (easiest to traverse)
279    FullCone,
280    /// Address-restricted cone NAT
281    AddressRestrictedCone,
282    /// Port-restricted cone NAT
283    PortRestrictedCone,
284    /// Symmetric NAT (hardest to traverse, may require relay)
285    Symmetric,
286    /// Unknown NAT type
287    #[default]
288    Unknown,
289}
290
291/// Capabilities and quality metrics for a connected peer.
292///
293/// This struct captures both static capabilities (what the peer can do)
294/// and dynamic metrics (how well the peer is performing).
295#[derive(Debug, Clone)]
296pub struct Capabilities {
297    /// Whether this peer can relay traffic for NAT traversal.
298    pub supports_relay: bool,
299
300    /// Whether this peer can coordinate NAT hole-punching.
301    pub supports_coordination: bool,
302
303    /// Observed external addresses for this peer.
304    pub observed_addrs: Vec<SocketAddr>,
305
306    /// Protocols this peer advertises support for.
307    pub protocols: Vec<ProtocolId>,
308
309    /// Last time we successfully communicated with this peer.
310    pub last_seen: SystemTime,
311
312    /// Median round-trip time in milliseconds (p50).
313    pub rtt_ms_p50: u32,
314
315    /// Estimated RTT jitter in milliseconds.
316    pub rtt_jitter_ms: u32,
317
318    /// Packet loss rate (0.0 to 1.0).
319    pub packet_loss: f32,
320
321    /// Inferred NAT type for connection strategy hints.
322    pub nat_type_hint: Option<NatHint>,
323
324    /// Peer's advertised bandwidth limit (bytes/sec), if any.
325    pub bandwidth_limit: Option<u64>,
326
327    /// Number of successful connections to this peer.
328    pub successful_connections: u32,
329
330    /// Number of failed connection attempts to this peer.
331    pub failed_connections: u32,
332
333    /// Whether this peer is currently connected.
334    pub is_connected: bool,
335}
336
337impl Default for Capabilities {
338    fn default() -> Self {
339        Self {
340            supports_relay: false,
341            supports_coordination: false,
342            observed_addrs: Vec::new(),
343            protocols: Vec::new(),
344            last_seen: SystemTime::UNIX_EPOCH,
345            rtt_ms_p50: 0,
346            rtt_jitter_ms: 0,
347            packet_loss: 0.0,
348            nat_type_hint: None,
349            bandwidth_limit: None,
350            successful_connections: 0,
351            failed_connections: 0,
352            is_connected: false,
353        }
354    }
355}
356
357impl Capabilities {
358    /// Create capabilities for a newly connected peer.
359    pub fn new_connected(addr: SocketAddr) -> Self {
360        Self {
361            observed_addrs: vec![addr],
362            last_seen: SystemTime::now(),
363            is_connected: true,
364            ..Default::default()
365        }
366    }
367
368    /// Calculate a quality score for peer selection (0.0 to 1.0).
369    ///
370    /// Higher scores indicate better peers for connection.
371    pub fn quality_score(&self) -> f32 {
372        let mut score = 0.5; // Base score
373
374        // RTT component (lower is better, max 300ms considered)
375        let rtt_score = 1.0 - (self.rtt_ms_p50 as f32 / 300.0).min(1.0);
376        score += rtt_score * 0.3;
377
378        // Packet loss component
379        let loss_score = 1.0 - self.packet_loss;
380        score += loss_score * 0.2;
381
382        // Connection success rate
383        let total = self.successful_connections + self.failed_connections;
384        if total > 0 {
385            let success_rate = self.successful_connections as f32 / total as f32;
386            score += success_rate * 0.2;
387        }
388
389        // Capability bonus
390        if self.supports_relay {
391            score += 0.05;
392        }
393        if self.supports_coordination {
394            score += 0.05;
395        }
396
397        // NAT type penalty
398        if let Some(nat) = self.nat_type_hint {
399            match nat {
400                NatHint::None | NatHint::FullCone => {}
401                NatHint::AddressRestrictedCone | NatHint::PortRestrictedCone => {
402                    score -= 0.05;
403                }
404                NatHint::Symmetric => {
405                    score -= 0.15;
406                }
407                NatHint::Unknown => {
408                    score -= 0.02;
409                }
410            }
411        }
412
413        score.clamp(0.0, 1.0)
414    }
415
416    /// Check if this peer supports a specific protocol.
417    pub fn supports_protocol(&self, proto: &ProtocolId) -> bool {
418        self.protocols.contains(proto)
419    }
420}
421
422// ============================================================================
423// Link Events
424// ============================================================================
425
426/// Reason for peer disconnection.
427#[derive(Debug, Clone, PartialEq, Eq)]
428pub enum DisconnectReason {
429    /// Clean shutdown initiated by local side.
430    LocalClose,
431    /// Clean shutdown initiated by remote side.
432    RemoteClose,
433    /// Connection timed out.
434    Timeout,
435    /// Transport error occurred.
436    TransportError(String),
437    /// Application-level error code.
438    ApplicationError(u64),
439    /// Connection was reset.
440    Reset,
441}
442
443/// Events emitted by the link transport layer.
444///
445/// These events notify the overlay about significant transport-level changes.
446#[derive(Debug, Clone)]
447pub enum LinkEvent {
448    /// A new peer has connected.
449    PeerConnected {
450        /// The connected peer's ID.
451        peer: PeerId,
452        /// Initial capabilities (may be updated later).
453        caps: Capabilities,
454    },
455
456    /// A peer has disconnected.
457    PeerDisconnected {
458        /// The disconnected peer's ID.
459        peer: PeerId,
460        /// Reason for disconnection.
461        reason: DisconnectReason,
462    },
463
464    /// Our observed external address has been updated.
465    ExternalAddressUpdated {
466        /// The new external address.
467        addr: SocketAddr,
468    },
469
470    /// A peer's capabilities have been updated.
471    CapabilityUpdated {
472        /// The peer whose capabilities changed.
473        peer: PeerId,
474        /// Updated capabilities.
475        caps: Capabilities,
476    },
477
478    /// A relay request has been received.
479    RelayRequest {
480        /// Peer requesting the relay.
481        from: PeerId,
482        /// Target peer for the relay.
483        to: PeerId,
484        /// Bytes remaining in relay budget.
485        budget_bytes: u64,
486    },
487
488    /// A NAT traversal coordination request has been received.
489    CoordinationRequest {
490        /// First peer in the coordination.
491        peer_a: PeerId,
492        /// Second peer in the coordination.
493        peer_b: PeerId,
494        /// Coordination round number.
495        round: u64,
496    },
497
498    /// The bootstrap cache has been updated.
499    BootstrapCacheUpdated {
500        /// Number of peers in the cache.
501        peer_count: usize,
502    },
503}
504
505// ============================================================================
506// Link Transport Errors
507// ============================================================================
508
509/// Errors that can occur during link transport operations.
510#[derive(Debug, Error, Clone)]
511pub enum LinkError {
512    /// The connection was closed.
513    #[error("connection closed")]
514    ConnectionClosed,
515
516    /// Failed to establish connection.
517    #[error("connection failed: {0}")]
518    ConnectionFailed(String),
519
520    /// The peer is not known/reachable.
521    #[error("peer not found: {0}")]
522    PeerNotFound(String),
523
524    /// Protocol negotiation failed.
525    #[error("protocol not supported: {0}")]
526    ProtocolNotSupported(ProtocolId),
527
528    /// A timeout occurred.
529    #[error("operation timed out")]
530    Timeout,
531
532    /// The stream was reset by the peer.
533    #[error("stream reset: error code {0}")]
534    StreamReset(u64),
535
536    /// An I/O error occurred.
537    #[error("I/O error: {0}")]
538    Io(String),
539
540    /// The transport is shutting down.
541    #[error("transport shutdown")]
542    Shutdown,
543
544    /// Rate limit exceeded.
545    #[error("rate limit exceeded")]
546    RateLimited,
547
548    /// Internal error.
549    #[error("internal error: {0}")]
550    Internal(String),
551}
552
553impl From<std::io::Error> for LinkError {
554    fn from(e: std::io::Error) -> Self {
555        Self::Io(e.to_string())
556    }
557}
558
559/// Result type for link transport operations.
560pub type LinkResult<T> = Result<T, LinkError>;
561
562// ============================================================================
563// Link Connection Trait
564// ============================================================================
565
566/// A boxed future for async operations.
567pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
568
569/// A boxed stream for async iteration.
570pub type BoxStream<'a, T> = Pin<Box<dyn futures_util::Stream<Item = T> + Send + 'a>>;
571
572/// A connection to a remote peer.
573///
574/// This trait abstracts a single QUIC connection, providing methods to
575/// open streams and send/receive datagrams. Connections are obtained via
576/// [`LinkTransport::dial`] or [`LinkTransport::accept`].
577///
578/// # Stream Types
579///
580/// - **Bidirectional streams** (`open_bi`): Both endpoints can send and receive.
581///   Use for request/response patterns.
582/// - **Unidirectional streams** (`open_uni`): Only the opener can send.
583///   Use for notifications or one-way data transfer.
584/// - **Datagrams** (`send_datagram`): Unreliable, unordered messages.
585///   Use for real-time data where latency > reliability.
586///
587/// # Connection Lifecycle
588///
589/// 1. Connection established (via dial or accept)
590/// 2. Open streams as needed
591/// 3. Close gracefully with `close()` or let it drop
592pub trait LinkConn: Send + Sync {
593    /// Get the remote peer's cryptographic identity.
594    ///
595    /// This is stable across reconnections and network changes.
596    fn peer(&self) -> PeerId;
597
598    /// Get the remote peer's current network address.
599    ///
600    /// Note: This may change during the connection lifetime due to
601    /// NAT rebinding or connection migration.
602    fn remote_addr(&self) -> SocketAddr;
603
604    /// Open a unidirectional stream (send only).
605    ///
606    /// The remote peer will receive this stream via their `accept_uni()`.
607    /// Use for one-way messages like notifications or log streams.
608    ///
609    /// # Example
610    /// ```rust,ignore
611    /// let mut stream = conn.open_uni().await?;
612    /// stream.write_all(b"notification").await?;
613    /// stream.finish()?; // Signal end of stream
614    /// ```
615    fn open_uni(&self) -> BoxFuture<'_, LinkResult<Box<dyn LinkSendStream>>>;
616
617    /// Open a bidirectional stream for request/response communication.
618    ///
619    /// Returns a (send, recv) pair. Both sides can write and read.
620    /// Use for RPC, file transfers, or any interactive protocol.
621    ///
622    /// # Example
623    /// ```rust,ignore
624    /// let (mut send, mut recv) = conn.open_bi().await?;
625    /// send.write_all(b"request").await?;
626    /// send.finish()?;
627    /// let response = recv.read_to_end(4096).await?;
628    /// ```
629    fn open_bi(
630        &self,
631    ) -> BoxFuture<'_, LinkResult<(Box<dyn LinkSendStream>, Box<dyn LinkRecvStream>)>>;
632
633    /// Send an unreliable datagram to the peer.
634    ///
635    /// Datagrams are:
636    /// - **Unreliable**: May be dropped without notification
637    /// - **Unordered**: May arrive out of order
638    /// - **Size-limited**: Must fit in a single QUIC packet (~1200 bytes)
639    ///
640    /// Use for heartbeats, metrics, or real-time data where occasional
641    /// loss is acceptable.
642    fn send_datagram(&self, data: Bytes) -> LinkResult<()>;
643
644    /// Receive datagrams from the peer.
645    ///
646    /// Returns a stream of datagrams. Each datagram is delivered as-is
647    /// (no framing). The stream ends when the connection closes.
648    fn recv_datagrams(&self) -> BoxStream<'_, Bytes>;
649
650    /// Close the connection gracefully.
651    ///
652    /// # Parameters
653    /// - `error_code`: Application-defined error code (0 = normal close)
654    /// - `reason`: Human-readable reason for debugging
655    fn close(&self, error_code: u64, reason: &str);
656
657    /// Check if the connection is still open.
658    ///
659    /// Returns false after the connection has been closed (locally or remotely)
660    /// or if a fatal error occurred.
661    fn is_open(&self) -> bool;
662
663    /// Get current connection statistics.
664    ///
665    /// Useful for monitoring connection health and debugging performance.
666    fn stats(&self) -> ConnectionStats;
667}
668
669/// Statistics for a connection.
670///
671/// Updated in real-time as the connection handles data. Use for:
672/// - Monitoring connection health
673/// - Detecting congestion (high RTT, packet loss)
674/// - Debugging performance issues
675///
676/// # Typical Values
677///
678/// | Metric | Good | Concerning | Critical |
679/// |--------|------|------------|----------|
680/// | RTT | <50ms | 50-200ms | >500ms |
681/// | Packet loss | <0.1% | 0.1-1% | >5% |
682///
683/// # Example
684/// ```rust,ignore
685/// let stats = conn.stats();
686/// if stats.rtt > Duration::from_millis(200) {
687///     log::warn!("High latency: {:?}", stats.rtt);
688/// }
689/// if stats.packets_lost > stats.bytes_sent / 100 {
690///     log::warn!("Significant packet loss detected");
691/// }
692/// ```
693#[derive(Debug, Clone, Default)]
694pub struct ConnectionStats {
695    /// Total bytes sent on this connection (including retransmits).
696    pub bytes_sent: u64,
697    /// Total bytes received on this connection.
698    pub bytes_received: u64,
699    /// Current smoothed round-trip time estimate.
700    /// Calculated using QUIC's RTT estimation algorithm.
701    pub rtt: Duration,
702    /// How long this connection has been established.
703    pub connected_duration: Duration,
704    /// Total number of streams opened (bidirectional + unidirectional).
705    pub streams_opened: u64,
706    /// Estimated packets lost during transmission.
707    /// High values indicate congestion or poor network conditions.
708    pub packets_lost: u64,
709}
710
711/// A send stream for writing data to a peer.
712pub trait LinkSendStream: Send + Sync {
713    /// Write data to the stream.
714    fn write<'a>(&'a mut self, data: &'a [u8]) -> BoxFuture<'a, LinkResult<usize>>;
715
716    /// Write all data to the stream.
717    fn write_all<'a>(&'a mut self, data: &'a [u8]) -> BoxFuture<'a, LinkResult<()>>;
718
719    /// Finish the stream (signal end of data).
720    fn finish(&mut self) -> LinkResult<()>;
721
722    /// Reset the stream with an error code.
723    fn reset(&mut self, error_code: u64) -> LinkResult<()>;
724
725    /// Get the stream ID.
726    fn id(&self) -> u64;
727}
728
729/// A receive stream for reading data from a peer.
730pub trait LinkRecvStream: Send + Sync {
731    /// Read data from the stream.
732    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> BoxFuture<'a, LinkResult<Option<usize>>>;
733
734    /// Read all data until the stream ends.
735    fn read_to_end(&mut self, size_limit: usize) -> BoxFuture<'_, LinkResult<Vec<u8>>>;
736
737    /// Stop receiving data (signal we don't want more).
738    fn stop(&mut self, error_code: u64) -> LinkResult<()>;
739
740    /// Get the stream ID.
741    fn id(&self) -> u64;
742}
743
744// ============================================================================
745// Link Transport Trait
746// ============================================================================
747
748/// Incoming connection stream.
749pub type Incoming<C> = BoxStream<'static, LinkResult<C>>;
750
751/// The primary transport abstraction for overlay networks.
752///
753/// This trait provides everything an overlay needs to establish connections,
754/// send/receive data, and monitor the transport layer.
755///
756/// # Implementation Notes
757///
758/// Implementors should:
759/// - Handle NAT traversal transparently
760/// - Maintain a peer table with capabilities
761/// - Emit events for connection state changes
762/// - Support protocol multiplexing
763///
764/// # Example Implementation
765///
766/// The default implementation wraps `P2pEndpoint`:
767///
768/// ```rust,ignore
769/// let config = P2pConfig::builder()
770///     .bind_addr("0.0.0.0:0".parse()?)
771///     .build()?;
772/// let endpoint = P2pEndpoint::new(config).await?;
773/// let transport: Arc<dyn LinkTransport<Conn = P2pLinkConn>> = Arc::new(endpoint);
774/// ```
775pub trait LinkTransport: Send + Sync + 'static {
776    /// The connection type returned by this transport.
777    type Conn: LinkConn + 'static;
778
779    /// Get our local peer identity.
780    ///
781    /// This is our stable cryptographic identity, derived from our key pair.
782    /// It remains constant across restarts and network changes.
783    fn local_peer(&self) -> PeerId;
784
785    /// Get our externally observed address, if known.
786    ///
787    /// Returns the address other peers see when we connect to them.
788    /// This is discovered via:
789    /// - OBSERVED_ADDRESS frames from connected peers
790    /// - NAT traversal address discovery
791    ///
792    /// Returns `None` if we haven't connected to any peers yet or
793    /// if we're behind a symmetric NAT that changes our external port.
794    fn external_address(&self) -> Option<SocketAddr>;
795
796    /// Get all known peers with their capabilities.
797    ///
798    /// Includes:
799    /// - Currently connected peers (`caps.is_connected = true`)
800    /// - Previously connected peers still in bootstrap cache
801    /// - Peers learned from relay/coordination traffic
802    ///
803    /// Use `Capabilities::quality_score()` to rank peers for selection.
804    fn peer_table(&self) -> Vec<(PeerId, Capabilities)>;
805
806    /// Get capabilities for a specific peer.
807    ///
808    /// Returns `None` if the peer is not known.
809    fn peer_capabilities(&self, peer: &PeerId) -> Option<Capabilities>;
810
811    /// Subscribe to transport-level events.
812    ///
813    /// Events include peer connections/disconnections, address changes,
814    /// and capability updates. Use for maintaining overlay state.
815    ///
816    /// Multiple subscribers are supported via broadcast channel.
817    fn subscribe(&self) -> broadcast::Receiver<LinkEvent>;
818
819    /// Accept incoming connections for a specific protocol.
820    ///
821    /// Returns a stream of connections from peers that want to speak
822    /// the specified protocol. Register your protocol first with
823    /// `register_protocol()`.
824    ///
825    /// # Example
826    /// ```rust,ignore
827    /// let mut incoming = transport.accept(MY_PROTOCOL);
828    /// while let Some(result) = incoming.next().await {
829    ///     if let Ok(conn) = result {
830    ///         tokio::spawn(handle_connection(conn));
831    ///     }
832    /// }
833    /// ```
834    fn accept(&self, proto: ProtocolId) -> Incoming<Self::Conn>;
835
836    /// Dial a peer by their PeerId (preferred method).
837    ///
838    /// Uses the peer table to find known addresses for this peer.
839    /// NAT traversal is handled automatically - if direct connection
840    /// fails, coordination and hole-punching are attempted.
841    ///
842    /// # Errors
843    /// - `PeerNotFound`: Peer not in table (need to bootstrap)
844    /// - `ConnectionFailed`: Network error (may be transient)
845    /// - `Timeout`: NAT traversal timed out (retry may succeed)
846    fn dial(&self, peer: PeerId, proto: ProtocolId) -> BoxFuture<'_, LinkResult<Self::Conn>>;
847
848    /// Dial a peer by direct address (for bootstrapping).
849    ///
850    /// Use when you don't know the peer's ID yet, such as when
851    /// connecting to a known seed address to join the network.
852    ///
853    /// After connection, the peer's ID will be available via
854    /// `conn.peer()`.
855    fn dial_addr(
856        &self,
857        addr: SocketAddr,
858        proto: ProtocolId,
859    ) -> BoxFuture<'_, LinkResult<Self::Conn>>;
860
861    /// Get protocols we advertise as supported.
862    fn supported_protocols(&self) -> Vec<ProtocolId>;
863
864    /// Register a protocol as supported.
865    ///
866    /// Call this before `accept()` to receive connections for the protocol.
867    /// Registered protocols are advertised to connected peers.
868    fn register_protocol(&self, proto: ProtocolId);
869
870    /// Unregister a protocol.
871    ///
872    /// Stops accepting new connections for this protocol. Existing
873    /// connections are not affected.
874    fn unregister_protocol(&self, proto: ProtocolId);
875
876    /// Check if we have an active connection to a peer.
877    fn is_connected(&self, peer: &PeerId) -> bool;
878
879    /// Get the count of active connections.
880    fn active_connections(&self) -> usize;
881
882    /// Gracefully shutdown the transport.
883    ///
884    /// Closes all connections, stops accepting new ones, and flushes
885    /// the bootstrap cache to disk. Pending operations will complete
886    /// or error.
887    ///
888    /// Call this before exiting to ensure clean shutdown.
889    fn shutdown(&self) -> BoxFuture<'_, ()>;
890}
891
892// ============================================================================
893// P2pEndpoint Implementation
894// ============================================================================
895
896// The implementation of LinkTransport for P2pEndpoint is in a separate file
897// to keep this module focused on the trait definitions.
898
899#[cfg(test)]
900mod tests {
901    use super::*;
902
903    #[test]
904    fn test_protocol_id_from_string() {
905        let proto = ProtocolId::from("saorsa-dht/1.0");
906        assert_eq!(&proto.0[..14], b"saorsa-dht/1.0");
907        assert_eq!(proto.0[14], 0);
908        assert_eq!(proto.0[15], 0);
909    }
910
911    #[test]
912    fn test_protocol_id_truncation() {
913        let proto = ProtocolId::from("this-is-a-very-long-protocol-name");
914        assert_eq!(&proto.0, b"this-is-a-very-l");
915    }
916
917    #[test]
918    fn test_protocol_id_display() {
919        let proto = ProtocolId::from("test/1.0");
920        assert_eq!(format!("{}", proto), "test/1.0");
921    }
922
923    #[test]
924    fn test_capabilities_quality_score() {
925        let mut caps = Capabilities::default();
926
927        // Default has perfect RTT (0ms) and no packet loss, so score should be high
928        // Score = 0.5 (base) + 0.3 (RTT: 1.0*0.3) + 0.2 (loss: 1.0*0.2) = 1.0
929        let base_score = caps.quality_score();
930        assert!(
931            (0.9..=1.0).contains(&base_score),
932            "base_score = {}",
933            base_score
934        );
935
936        // Worse RTT should reduce score
937        caps.rtt_ms_p50 = 150; // 50% of max
938        let worse_rtt_score = caps.quality_score();
939        assert!(
940            worse_rtt_score < base_score,
941            "worse RTT should reduce score"
942        );
943
944        // Very bad RTT should reduce score more
945        caps.rtt_ms_p50 = 500;
946        let bad_rtt_score = caps.quality_score();
947        assert!(
948            bad_rtt_score < worse_rtt_score,
949            "bad RTT should reduce score more"
950        );
951
952        // Symmetric NAT should reduce score
953        caps.rtt_ms_p50 = 50;
954        caps.nat_type_hint = Some(NatHint::Symmetric);
955        let nat_score = caps.quality_score();
956        // Reset RTT for fair comparison
957        caps.nat_type_hint = None;
958        caps.rtt_ms_p50 = 50;
959        let no_nat_score = caps.quality_score();
960        assert!(
961            nat_score < no_nat_score,
962            "symmetric NAT should reduce score"
963        );
964    }
965
966    #[test]
967    fn test_capabilities_supports_protocol() {
968        let mut caps = Capabilities::default();
969        let dht = ProtocolId::from("dht/1.0");
970        let gossip = ProtocolId::from("gossip/1.0");
971
972        caps.protocols.push(dht);
973
974        assert!(caps.supports_protocol(&dht));
975        assert!(!caps.supports_protocol(&gossip));
976    }
977}