Skip to main content

ant_quic/
link_transport.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! # Link Transport Abstraction Layer
9//!
10//! This module provides the [`LinkTransport`] and [`LinkConn`] traits that abstract
11//! the transport layer for overlay networks like saorsa-core. This enables:
12//!
13//! - **Version decoupling**: Overlays can compile against a stable trait interface
14//!   while ant-quic evolves underneath
15//! - **Testing**: Mock transports for unit testing overlay logic
16//! - **Alternative transports**: Future support for WebRTC, TCP fallback, etc.
17//!
18//! ## Architecture
19//!
20//! ```text
21//! ┌─────────────────────────────────────────────────────────────────┐
22//! │                      saorsa-core (overlay)                       │
23//! │  DHT routing │ Record storage │ Greedy routing │ Naming         │
24//! └─────────────────────────────────────────────────────────────────┘
25//!                              │
26//!                              ▼
27//! ┌─────────────────────────────────────────────────────────────────┐
28//! │                    LinkTransport trait                          │
29//! │  local_peer() │ peer_table() │ dial() │ accept() │ subscribe()  │
30//! └─────────────────────────────────────────────────────────────────┘
31//!                              │
32//!                              ▼
33//! ┌─────────────────────────────────────────────────────────────────┐
34//! │                  ant-quic P2pEndpoint                            │
35//! │  QUIC transport │ NAT traversal │ PQC │ Connection management   │
36//! └─────────────────────────────────────────────────────────────────┘
37//! ```
38//!
39//! ## Example: Implementing an Overlay
40//!
41//! ```rust,ignore
42//! use ant_quic::link_transport::{LinkTransport, LinkConn, LinkEvent, ProtocolId, LinkError};
43//! use std::sync::Arc;
44//! use futures_util::StreamExt;
45//!
46//! // Define your overlay's protocol identifier
47//! const DHT_PROTOCOL: ProtocolId = ProtocolId::from_static(b"saorsa-dht/1.0.0");
48//!
49//! async fn run_overlay<T: LinkTransport>(transport: Arc<T>) -> anyhow::Result<()> {
50//!     // Register our protocol so peers know we support it
51//!     transport.register_protocol(DHT_PROTOCOL);
52//!
53//!     // Subscribe to transport events for connection lifecycle
54//!     let mut events = transport.subscribe();
55//!     tokio::spawn(async move {
56//!         while let Ok(event) = events.recv().await {
57//!             match event {
58//!                 LinkEvent::PeerConnected { peer, caps } => {
59//!                     println!("New peer: {:?}, relay: {}", peer, caps.supports_relay);
60//!                 }
61//!                 LinkEvent::PeerDisconnected { peer, reason } => {
62//!                     println!("Lost peer: {:?}, reason: {:?}", peer, reason);
63//!                 }
64//!                 _ => {}
65//!             }
66//!         }
67//!     });
68//!
69//!     // Accept incoming connections in a background task
70//!     let transport_clone = transport.clone();
71//!     tokio::spawn(async move {
72//!         let mut incoming = transport_clone.accept(DHT_PROTOCOL);
73//!         while let Some(result) = incoming.next().await {
74//!             match result {
75//!                 Ok(conn) => {
76//!                     println!("Accepted connection from {:?}", conn.peer());
77//!                     // Handle connection...
78//!                 }
79//!                 Err(e) => eprintln!("Accept error: {}", e),
80//!             }
81//!         }
82//!     });
83//!
84//!     // Dial a peer using their PeerId (NAT traversal handled automatically)
85//!     let peers = transport.peer_table();
86//!     if let Some((peer_id, caps)) = peers.first() {
87//!         match transport.dial(*peer_id, DHT_PROTOCOL).await {
88//!             Ok(conn) => {
89//!                 // Open a bidirectional stream for request/response
90//!                 let (mut send, mut recv) = conn.open_bi().await?;
91//!                 send.write_all(b"PING").await?;
92//!                 send.finish()?;
93//!
94//!                 let response = recv.read_to_end(1024).await?;
95//!                 println!("Response: {:?}", response);
96//!             }
97//!             Err(LinkError::PeerNotFound(_)) => {
98//!                 println!("Peer not in table - need to bootstrap");
99//!             }
100//!             Err(e) => eprintln!("Dial failed: {}", e),
101//!         }
102//!     }
103//!
104//!     Ok(())
105//! }
106//! ```
107//!
108//! ## Choosing Stream Types
109//!
110//! - **Bidirectional (`open_bi`)**: Use for request/response patterns where both
111//!   sides send and receive. Example: RPC calls, file transfers with acknowledgment.
112//!
113//! - **Unidirectional (`open_uni`)**: Use for one-way messages where no response
114//!   is needed. Example: event notifications, log streaming, pub/sub.
115//!
116//! - **Datagrams (`send_datagram`)**: Use for small, unreliable messages where
117//!   latency matters more than reliability. Example: heartbeats, real-time metrics.
118//!
119//! ## Error Handling Patterns
120//!
121//! ```rust,ignore
122//! use ant_quic::link_transport::{LinkError, LinkResult};
123//!
124//! async fn connect_with_retry<T: LinkTransport>(
125//!     transport: &T,
126//!     peer: PeerId,
127//!     proto: ProtocolId,
128//! ) -> LinkResult<T::Conn> {
129//!     for attempt in 1..=3 {
130//!         match transport.dial(peer, proto).await {
131//!             Ok(conn) => return Ok(conn),
132//!             Err(LinkError::PeerNotFound(_)) => {
133//!                 // Peer not in table - can't retry, need bootstrap
134//!                 return Err(LinkError::PeerNotFound(format!("{:?}", peer)));
135//!             }
136//!             Err(LinkError::ConnectionFailed(msg)) if attempt < 3 => {
137//!                 // Transient failure - retry after delay
138//!                 tokio::time::sleep(Duration::from_millis(100 * attempt as u64)).await;
139//!                 continue;
140//!             }
141//!             Err(LinkError::Timeout) if attempt < 3 => {
142//!                 // NAT traversal may need multiple attempts
143//!                 continue;
144//!             }
145//!             Err(e) => return Err(e),
146//!         }
147//!     }
148//!     Err(LinkError::ConnectionFailed("max retries exceeded".into()))
149//! }
150//! ```
151
152use std::collections::HashSet;
153use std::fmt;
154use std::future::Future;
155use std::net::SocketAddr;
156use std::pin::Pin;
157use std::time::{Duration, SystemTime};
158
159use async_trait::async_trait;
160use bytes::Bytes;
161use serde::{Deserialize, Serialize};
162use thiserror::Error;
163use tokio::sync::broadcast;
164
165use crate::nat_traversal_api::PeerId;
166use crate::transport::TransportAddr;
167
168// ============================================================================
169// Stream Type Registry (Protocol Multiplexing)
170// ============================================================================
171
172/// Stream type identifier - the first byte of each QUIC stream.
173///
174/// This enum provides a hardcoded registry of protocol types for multiplexing
175/// multiple protocols over a single QUIC connection. Each stream's first byte
176/// identifies its protocol type.
177///
178/// # Protocol Ranges
179///
180/// | Range | Protocol Family | Types |
181/// |-------|-----------------|-------|
182/// | 0x00-0x0F | Gossip | Membership, PubSub, Bulk |
183/// | 0x10-0x1F | DHT | Query, Store, Witness, Replication |
184/// | 0x20-0x2F | WebRTC | Signal, Media, Data |
185/// | 0xF0-0xFF | Reserved | Future use |
186///
187/// # Example
188///
189/// ```rust
190/// use ant_quic::link_transport::StreamType;
191///
192/// // Check if a byte is a valid stream type
193/// let stream_type = StreamType::from_byte(0x10);
194/// assert_eq!(stream_type, Some(StreamType::DhtQuery));
195///
196/// // Get all gossip types
197/// for st in StreamType::gossip_types() {
198///     println!("Gossip type: {}", st);
199/// }
200/// ```
201#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
202#[repr(u8)]
203pub enum StreamType {
204    // =========================================================================
205    // Gossip Protocols (0x00-0x0F)
206    // =========================================================================
207    /// Membership protocol messages (HyParView, SWIM).
208    Membership = 0x00,
209
210    /// PubSub protocol messages (Plumtree).
211    PubSub = 0x01,
212
213    /// Bulk gossip data transfer (CRDT deltas, large payloads).
214    GossipBulk = 0x02,
215
216    // =========================================================================
217    // DHT Protocols (0x10-0x1F)
218    // =========================================================================
219    /// DHT query operations (GET, FIND_NODE, FIND_VALUE).
220    DhtQuery = 0x10,
221
222    /// DHT store operations (PUT, STORE).
223    DhtStore = 0x11,
224
225    /// DHT witness operations (Byzantine fault tolerance).
226    DhtWitness = 0x12,
227
228    /// DHT replication operations (background repair).
229    DhtReplication = 0x13,
230
231    // =========================================================================
232    // WebRTC Protocols (0x20-0x2F)
233    // =========================================================================
234    /// WebRTC signaling (SDP, ICE candidates via QUIC).
235    WebRtcSignal = 0x20,
236
237    /// WebRTC media streams (audio/video RTP).
238    WebRtcMedia = 0x21,
239
240    /// WebRTC data channels.
241    WebRtcData = 0x22,
242
243    // =========================================================================
244    // Reserved (0xF0-0xFF)
245    // =========================================================================
246    /// Reserved for future protocols.
247    Reserved = 0xF0,
248}
249
250impl StreamType {
251    /// Parse a stream type from its byte value.
252    ///
253    /// Returns `None` for unknown/unassigned values.
254    #[inline]
255    pub fn from_byte(byte: u8) -> Option<Self> {
256        match byte {
257            0x00 => Some(Self::Membership),
258            0x01 => Some(Self::PubSub),
259            0x02 => Some(Self::GossipBulk),
260            0x10 => Some(Self::DhtQuery),
261            0x11 => Some(Self::DhtStore),
262            0x12 => Some(Self::DhtWitness),
263            0x13 => Some(Self::DhtReplication),
264            0x20 => Some(Self::WebRtcSignal),
265            0x21 => Some(Self::WebRtcMedia),
266            0x22 => Some(Self::WebRtcData),
267            0xF0 => Some(Self::Reserved),
268            _ => None,
269        }
270    }
271
272    /// Get the byte value for this stream type.
273    #[inline]
274    pub const fn as_byte(self) -> u8 {
275        self as u8
276    }
277
278    /// Get the protocol family for this stream type.
279    #[inline]
280    pub const fn family(self) -> StreamTypeFamily {
281        match self as u8 {
282            0x00..=0x0F => StreamTypeFamily::Gossip,
283            0x10..=0x1F => StreamTypeFamily::Dht,
284            0x20..=0x2F => StreamTypeFamily::WebRtc,
285            _ => StreamTypeFamily::Reserved,
286        }
287    }
288
289    /// Check if this is a gossip protocol type.
290    #[inline]
291    pub const fn is_gossip(self) -> bool {
292        matches!(self.family(), StreamTypeFamily::Gossip)
293    }
294
295    /// Check if this is a DHT protocol type.
296    #[inline]
297    pub const fn is_dht(self) -> bool {
298        matches!(self.family(), StreamTypeFamily::Dht)
299    }
300
301    /// Check if this is a WebRTC protocol type.
302    #[inline]
303    pub const fn is_webrtc(self) -> bool {
304        matches!(self.family(), StreamTypeFamily::WebRtc)
305    }
306
307    /// Get all gossip stream types.
308    pub const fn gossip_types() -> &'static [StreamType] {
309        &[Self::Membership, Self::PubSub, Self::GossipBulk]
310    }
311
312    /// Get all DHT stream types.
313    pub const fn dht_types() -> &'static [StreamType] {
314        &[
315            Self::DhtQuery,
316            Self::DhtStore,
317            Self::DhtWitness,
318            Self::DhtReplication,
319        ]
320    }
321
322    /// Get all WebRTC stream types.
323    pub const fn webrtc_types() -> &'static [StreamType] {
324        &[Self::WebRtcSignal, Self::WebRtcMedia, Self::WebRtcData]
325    }
326
327    /// Get all defined stream types.
328    pub const fn all_types() -> &'static [StreamType] {
329        &[
330            Self::Membership,
331            Self::PubSub,
332            Self::GossipBulk,
333            Self::DhtQuery,
334            Self::DhtStore,
335            Self::DhtWitness,
336            Self::DhtReplication,
337            Self::WebRtcSignal,
338            Self::WebRtcMedia,
339            Self::WebRtcData,
340            Self::Reserved,
341        ]
342    }
343}
344
345impl fmt::Display for StreamType {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347        match self {
348            Self::Membership => write!(f, "Membership"),
349            Self::PubSub => write!(f, "PubSub"),
350            Self::GossipBulk => write!(f, "GossipBulk"),
351            Self::DhtQuery => write!(f, "DhtQuery"),
352            Self::DhtStore => write!(f, "DhtStore"),
353            Self::DhtWitness => write!(f, "DhtWitness"),
354            Self::DhtReplication => write!(f, "DhtReplication"),
355            Self::WebRtcSignal => write!(f, "WebRtcSignal"),
356            Self::WebRtcMedia => write!(f, "WebRtcMedia"),
357            Self::WebRtcData => write!(f, "WebRtcData"),
358            Self::Reserved => write!(f, "Reserved"),
359        }
360    }
361}
362
363impl From<StreamType> for u8 {
364    fn from(st: StreamType) -> Self {
365        st as u8
366    }
367}
368
369impl TryFrom<u8> for StreamType {
370    type Error = LinkError;
371
372    fn try_from(byte: u8) -> Result<Self, Self::Error> {
373        Self::from_byte(byte).ok_or(LinkError::InvalidStreamType(byte))
374    }
375}
376
377/// Protocol family for stream types.
378#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
379pub enum StreamTypeFamily {
380    /// Gossip protocols (0x00-0x0F).
381    Gossip,
382    /// DHT protocols (0x10-0x1F).
383    Dht,
384    /// WebRTC protocols (0x20-0x2F).
385    WebRtc,
386    /// Reserved (0xF0-0xFF).
387    Reserved,
388}
389
390impl StreamTypeFamily {
391    /// Get the byte range for this protocol family.
392    pub const fn byte_range(self) -> (u8, u8) {
393        match self {
394            Self::Gossip => (0x00, 0x0F),
395            Self::Dht => (0x10, 0x1F),
396            Self::WebRtc => (0x20, 0x2F),
397            Self::Reserved => (0xF0, 0xFF),
398        }
399    }
400
401    /// Check if a byte is in this family's range.
402    pub const fn contains(self, byte: u8) -> bool {
403        let (start, end) = self.byte_range();
404        byte >= start && byte <= end
405    }
406}
407
408/// A filter for accepting specific stream types.
409///
410/// Use this with `accept_bi_typed` and `accept_uni_typed` to filter
411/// incoming streams by protocol type.
412///
413/// # Example
414///
415/// ```rust
416/// use ant_quic::link_transport::{StreamFilter, StreamType};
417///
418/// // Accept only DHT streams
419/// let filter = StreamFilter::new()
420///     .with_types(StreamType::dht_types());
421///
422/// // Accept gossip and DHT
423/// let filter = StreamFilter::new()
424///     .with_type(StreamType::Membership)
425///     .with_type(StreamType::DhtQuery);
426/// ```
427#[derive(Debug, Clone, Default)]
428pub struct StreamFilter {
429    /// Allowed stream types. Empty means accept all.
430    allowed: HashSet<StreamType>,
431}
432
433impl StreamFilter {
434    /// Create a new empty filter (accepts all types).
435    pub fn new() -> Self {
436        Self::default()
437    }
438
439    /// Create a filter that accepts all stream types.
440    pub fn accept_all() -> Self {
441        let mut filter = Self::new();
442        for st in StreamType::all_types() {
443            filter.allowed.insert(*st);
444        }
445        filter
446    }
447
448    /// Create a filter for gossip streams only.
449    pub fn gossip_only() -> Self {
450        Self::new().with_types(StreamType::gossip_types())
451    }
452
453    /// Create a filter for DHT streams only.
454    pub fn dht_only() -> Self {
455        Self::new().with_types(StreamType::dht_types())
456    }
457
458    /// Create a filter for WebRTC streams only.
459    pub fn webrtc_only() -> Self {
460        Self::new().with_types(StreamType::webrtc_types())
461    }
462
463    /// Add a single stream type to the filter.
464    pub fn with_type(mut self, stream_type: StreamType) -> Self {
465        self.allowed.insert(stream_type);
466        self
467    }
468
469    /// Add multiple stream types to the filter.
470    pub fn with_types(mut self, stream_types: &[StreamType]) -> Self {
471        for st in stream_types {
472            self.allowed.insert(*st);
473        }
474        self
475    }
476
477    /// Check if a stream type is accepted by this filter.
478    pub fn accepts(&self, stream_type: StreamType) -> bool {
479        self.allowed.is_empty() || self.allowed.contains(&stream_type)
480    }
481
482    /// Check if this filter accepts any type (is empty).
483    pub fn accepts_all(&self) -> bool {
484        self.allowed.is_empty()
485    }
486
487    /// Get the set of allowed types.
488    pub fn allowed_types(&self) -> &HashSet<StreamType> {
489        &self.allowed
490    }
491}
492
493// ============================================================================
494// Protocol Identifier
495// ============================================================================
496
497/// Protocol identifier for multiplexing multiple overlays on a single transport.
498///
499/// Protocols are identified by a 16-byte value, allowing efficient binary comparison
500/// while supporting human-readable names during debugging.
501///
502/// # Examples
503///
504/// ```rust
505/// use ant_quic::link_transport::ProtocolId;
506///
507/// // From a static string (padded/truncated to 16 bytes)
508/// const DHT: ProtocolId = ProtocolId::from_static(b"saorsa-dht/1.0.0");
509///
510/// // From bytes
511/// let proto = ProtocolId::new([0x73, 0x61, 0x6f, 0x72, 0x73, 0x61, 0x2d, 0x64,
512///                              0x68, 0x74, 0x2f, 0x31, 0x2e, 0x30, 0x2e, 0x30]);
513/// ```
514#[derive(Clone, Copy, PartialEq, Eq, Hash)]
515pub struct ProtocolId(pub [u8; 16]);
516
517impl ProtocolId {
518    /// Create a new protocol ID from raw bytes.
519    #[inline]
520    pub const fn new(bytes: [u8; 16]) -> Self {
521        Self(bytes)
522    }
523
524    /// Create a protocol ID from a static byte string.
525    ///
526    /// The string is padded with zeros if shorter than 16 bytes,
527    /// or truncated if longer.
528    #[inline]
529    pub const fn from_static(s: &[u8]) -> Self {
530        let mut bytes = [0u8; 16];
531        let len = if s.len() < 16 { s.len() } else { 16 };
532        let mut i = 0;
533        while i < len {
534            bytes[i] = s[i];
535            i += 1;
536        }
537        Self(bytes)
538    }
539
540    /// Get the raw bytes of this protocol ID.
541    #[inline]
542    pub const fn as_bytes(&self) -> &[u8; 16] {
543        &self.0
544    }
545
546    /// The default protocol for connections without explicit protocol negotiation.
547    pub const DEFAULT: Self = Self::from_static(b"ant-quic/default");
548
549    /// Protocol ID for NAT traversal coordination messages.
550    pub const NAT_TRAVERSAL: Self = Self::from_static(b"ant-quic/nat");
551
552    /// Protocol ID for relay traffic.
553    pub const RELAY: Self = Self::from_static(b"ant-quic/relay");
554}
555
556impl Default for ProtocolId {
557    fn default() -> Self {
558        Self::DEFAULT
559    }
560}
561
562impl fmt::Debug for ProtocolId {
563    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
564        // Try to display as UTF-8 string, trimming null bytes
565        let end = self.0.iter().position(|&b| b == 0).unwrap_or(16);
566        if let Ok(s) = std::str::from_utf8(&self.0[..end]) {
567            write!(f, "ProtocolId({:?})", s)
568        } else {
569            write!(f, "ProtocolId({:?})", hex::encode(self.0))
570        }
571    }
572}
573
574impl fmt::Display for ProtocolId {
575    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
576        let end = self.0.iter().position(|&b| b == 0).unwrap_or(16);
577        if let Ok(s) = std::str::from_utf8(&self.0[..end]) {
578            write!(f, "{}", s)
579        } else {
580            write!(f, "{}", hex::encode(self.0))
581        }
582    }
583}
584
585impl From<&str> for ProtocolId {
586    fn from(s: &str) -> Self {
587        Self::from_static(s.as_bytes())
588    }
589}
590
591impl From<[u8; 16]> for ProtocolId {
592    fn from(bytes: [u8; 16]) -> Self {
593        Self(bytes)
594    }
595}
596
597// ============================================================================
598// Peer Capabilities
599// ============================================================================
600
601/// NAT type classification hint for connection strategy selection.
602#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
603pub enum NatHint {
604    /// No NAT detected (public IP, direct connectivity)
605    None,
606    /// Full cone NAT (easiest to traverse)
607    FullCone,
608    /// Address-restricted cone NAT
609    AddressRestrictedCone,
610    /// Port-restricted cone NAT
611    PortRestrictedCone,
612    /// Symmetric NAT (hardest to traverse, may require relay)
613    Symmetric,
614    /// Unknown NAT type
615    #[default]
616    Unknown,
617}
618
619/// Capabilities and quality metrics for a connected peer.
620///
621/// This struct captures both static capabilities (what the peer can do)
622/// and dynamic metrics (how well the peer is performing).
623#[derive(Debug, Clone)]
624pub struct Capabilities {
625    /// Whether this peer can relay traffic for NAT traversal.
626    pub supports_relay: bool,
627
628    /// Whether this peer can coordinate NAT hole-punching.
629    pub supports_coordination: bool,
630
631    /// Observed external addresses for this peer.
632    pub observed_addrs: Vec<SocketAddr>,
633
634    /// Protocols this peer advertises support for.
635    pub protocols: Vec<ProtocolId>,
636
637    /// Last time we successfully communicated with this peer.
638    pub last_seen: SystemTime,
639
640    /// Median round-trip time in milliseconds (p50).
641    pub rtt_ms_p50: u32,
642
643    /// Estimated RTT jitter in milliseconds.
644    pub rtt_jitter_ms: u32,
645
646    /// Packet loss rate (0.0 to 1.0).
647    pub packet_loss: f32,
648
649    /// Inferred NAT type for connection strategy hints.
650    pub nat_type_hint: Option<NatHint>,
651
652    /// Peer's advertised bandwidth limit (bytes/sec), if any.
653    pub bandwidth_limit: Option<u64>,
654
655    /// Number of successful connections to this peer.
656    pub successful_connections: u32,
657
658    /// Number of failed connection attempts to this peer.
659    pub failed_connections: u32,
660
661    /// Whether this peer is currently connected.
662    pub is_connected: bool,
663}
664
665impl Default for Capabilities {
666    fn default() -> Self {
667        Self {
668            supports_relay: false,
669            supports_coordination: false,
670            observed_addrs: Vec::new(),
671            protocols: Vec::new(),
672            last_seen: SystemTime::UNIX_EPOCH,
673            rtt_ms_p50: 0,
674            rtt_jitter_ms: 0,
675            packet_loss: 0.0,
676            nat_type_hint: None,
677            bandwidth_limit: None,
678            successful_connections: 0,
679            failed_connections: 0,
680            is_connected: false,
681        }
682    }
683}
684
685impl Capabilities {
686    /// Create capabilities for a newly connected peer.
687    pub fn new_connected(addr: SocketAddr) -> Self {
688        Self {
689            observed_addrs: vec![addr],
690            last_seen: SystemTime::now(),
691            is_connected: true,
692            ..Default::default()
693        }
694    }
695
696    /// Calculate a quality score for peer selection (0.0 to 1.0).
697    ///
698    /// Higher scores indicate better peers for connection.
699    pub fn quality_score(&self) -> f32 {
700        let mut score = 0.5; // Base score
701
702        // RTT component (lower is better, max 300ms considered)
703        let rtt_score = 1.0 - (self.rtt_ms_p50 as f32 / 300.0).min(1.0);
704        score += rtt_score * 0.3;
705
706        // Packet loss component
707        let loss_score = 1.0 - self.packet_loss;
708        score += loss_score * 0.2;
709
710        // Connection success rate
711        let total = self.successful_connections + self.failed_connections;
712        if total > 0 {
713            let success_rate = self.successful_connections as f32 / total as f32;
714            score += success_rate * 0.2;
715        }
716
717        // Capability bonus
718        if self.supports_relay {
719            score += 0.05;
720        }
721        if self.supports_coordination {
722            score += 0.05;
723        }
724
725        // NAT type penalty
726        if let Some(nat) = self.nat_type_hint {
727            match nat {
728                NatHint::None | NatHint::FullCone => {}
729                NatHint::AddressRestrictedCone | NatHint::PortRestrictedCone => {
730                    score -= 0.05;
731                }
732                NatHint::Symmetric => {
733                    score -= 0.15;
734                }
735                NatHint::Unknown => {
736                    score -= 0.02;
737                }
738            }
739        }
740
741        score.clamp(0.0, 1.0)
742    }
743
744    /// Check if this peer supports a specific protocol.
745    pub fn supports_protocol(&self, proto: &ProtocolId) -> bool {
746        self.protocols.contains(proto)
747    }
748}
749
750// ============================================================================
751// Link Events
752// ============================================================================
753
754/// Reason for peer disconnection.
755#[derive(Debug, Clone, PartialEq, Eq)]
756pub enum DisconnectReason {
757    /// Clean shutdown initiated by local side.
758    LocalClose,
759    /// Clean shutdown initiated by remote side.
760    RemoteClose,
761    /// Connection timed out.
762    Timeout,
763    /// Transport error occurred.
764    TransportError(String),
765    /// Application-level error code.
766    ApplicationError(u64),
767    /// Connection was reset.
768    Reset,
769}
770
771/// Events emitted by the link transport layer.
772///
773/// These events notify the overlay about significant transport-level changes.
774#[derive(Debug, Clone)]
775pub enum LinkEvent {
776    /// A new peer has connected.
777    PeerConnected {
778        /// The connected peer's ID.
779        peer: PeerId,
780        /// Initial capabilities (may be updated later).
781        caps: Capabilities,
782    },
783
784    /// A peer has disconnected.
785    PeerDisconnected {
786        /// The disconnected peer's ID.
787        peer: PeerId,
788        /// Reason for disconnection.
789        reason: DisconnectReason,
790    },
791
792    /// Our observed external address has been updated.
793    ExternalAddressUpdated {
794        /// The new external address (supports all transport types).
795        addr: TransportAddr,
796    },
797
798    /// A peer's capabilities have been updated.
799    CapabilityUpdated {
800        /// The peer whose capabilities changed.
801        peer: PeerId,
802        /// Updated capabilities.
803        caps: Capabilities,
804    },
805
806    /// A relay request has been received.
807    RelayRequest {
808        /// Peer requesting the relay.
809        from: PeerId,
810        /// Target peer for the relay.
811        to: PeerId,
812        /// Bytes remaining in relay budget.
813        budget_bytes: u64,
814    },
815
816    /// A NAT traversal coordination request has been received.
817    CoordinationRequest {
818        /// First peer in the coordination.
819        peer_a: PeerId,
820        /// Second peer in the coordination.
821        peer_b: PeerId,
822        /// Coordination round number.
823        round: u64,
824    },
825
826    /// The bootstrap cache has been updated.
827    BootstrapCacheUpdated {
828        /// Number of peers in the cache.
829        peer_count: usize,
830    },
831}
832
833// ============================================================================
834// Protocol Handler Abstraction
835// ============================================================================
836
837/// Handler for specific protocol stream types.
838///
839/// Implement this trait to handle incoming streams by protocol type.
840/// Each handler declares which [`StreamType`]s it processes and receives
841/// matching streams via [`Self::handle_stream`].
842///
843/// # Example
844///
845/// ```rust,ignore
846/// use ant_quic::link_transport::{ProtocolHandler, StreamType, LinkResult, PeerId};
847/// use async_trait::async_trait;
848/// use bytes::Bytes;
849///
850/// struct GossipHandler;
851///
852/// #[async_trait]
853/// impl ProtocolHandler for GossipHandler {
854///     fn stream_types(&self) -> &[StreamType] {
855///         StreamType::gossip_types()
856///     }
857///
858///     async fn handle_stream(
859///         &self,
860///         peer: PeerId,
861///         stream_type: StreamType,
862///         data: Bytes,
863///     ) -> LinkResult<Option<Bytes>> {
864///         // Process incoming gossip message, optionally return response
865///         Ok(None)
866///     }
867/// }
868/// ```
869#[async_trait]
870pub trait ProtocolHandler: Send + Sync {
871    /// Get the stream types this handler processes.
872    fn stream_types(&self) -> &[StreamType];
873
874    /// Handle an incoming stream.
875    ///
876    /// # Arguments
877    ///
878    /// * `peer` - The peer that sent the stream
879    /// * `stream_type` - The type of stream received
880    /// * `data` - The stream payload data
881    ///
882    /// # Returns
883    ///
884    /// * `Ok(Some(response))` - Send response back to the peer
885    /// * `Ok(None)` - No response (close stream gracefully)
886    /// * `Err(e)` - Handler error (stream closed with error)
887    async fn handle_stream(
888        &self,
889        peer: PeerId,
890        stream_type: StreamType,
891        data: Bytes,
892    ) -> LinkResult<Option<Bytes>>;
893
894    /// Handle an incoming datagram.
895    ///
896    /// Default implementation does nothing. Override for unreliable messaging.
897    async fn handle_datagram(
898        &self,
899        _peer: PeerId,
900        _stream_type: StreamType,
901        _data: Bytes,
902    ) -> LinkResult<()> {
903        Ok(())
904    }
905
906    /// Called when the handler is being shut down.
907    ///
908    /// Default implementation does nothing. Override for cleanup.
909    async fn shutdown(&self) -> LinkResult<()> {
910        Ok(())
911    }
912
913    /// Get a human-readable name for this handler.
914    ///
915    /// Used in logging and debugging.
916    fn name(&self) -> &str {
917        "ProtocolHandler"
918    }
919}
920
921/// A boxed protocol handler for dynamic dispatch.
922pub type BoxedHandler = Box<dyn ProtocolHandler>;
923
924/// Extension trait for creating boxed handlers.
925pub trait ProtocolHandlerExt: ProtocolHandler + Sized + 'static {
926    /// Box this handler for use with [`crate::SharedTransport`].
927    fn boxed(self) -> BoxedHandler {
928        Box::new(self)
929    }
930}
931
932impl<T: ProtocolHandler + 'static> ProtocolHandlerExt for T {}
933
934// ============================================================================
935// Link Transport Errors
936// ============================================================================
937
938/// Errors that can occur during link transport operations.
939#[derive(Debug, Error, Clone)]
940pub enum LinkError {
941    /// The connection was closed.
942    #[error("connection closed")]
943    ConnectionClosed,
944
945    /// Failed to establish connection.
946    #[error("connection failed: {0}")]
947    ConnectionFailed(String),
948
949    /// The peer is not known/reachable.
950    #[error("peer not found: {0}")]
951    PeerNotFound(String),
952
953    /// Protocol negotiation failed.
954    #[error("protocol not supported: {0}")]
955    ProtocolNotSupported(ProtocolId),
956
957    /// A timeout occurred.
958    #[error("operation timed out")]
959    Timeout,
960
961    /// The stream was reset by the peer.
962    #[error("stream reset: error code {0}")]
963    StreamReset(u64),
964
965    /// An I/O error occurred.
966    #[error("I/O error: {0}")]
967    Io(String),
968
969    /// The transport is shutting down.
970    #[error("transport shutdown")]
971    Shutdown,
972
973    /// Rate limit exceeded.
974    #[error("rate limit exceeded")]
975    RateLimited,
976
977    /// Internal error.
978    #[error("internal error: {0}")]
979    Internal(String),
980
981    /// Invalid stream type byte.
982    #[error("invalid stream type byte: 0x{0:02x}")]
983    InvalidStreamType(u8),
984
985    /// Stream type not accepted by filter.
986    #[error("stream type {0} not accepted")]
987    StreamTypeFiltered(StreamType),
988
989    /// Handler already registered for stream type.
990    #[error("handler already exists for stream type: {0}")]
991    HandlerExists(StreamType),
992
993    /// No handler registered for stream type.
994    #[error("no handler for stream type: {0}")]
995    NoHandler(StreamType),
996
997    /// Transport not running.
998    #[error("transport not running")]
999    NotRunning,
1000
1001    /// Transport already running.
1002    #[error("transport already running")]
1003    AlreadyRunning,
1004}
1005
1006impl From<std::io::Error> for LinkError {
1007    fn from(e: std::io::Error) -> Self {
1008        Self::Io(e.to_string())
1009    }
1010}
1011
1012/// Result type for link transport operations.
1013pub type LinkResult<T> = Result<T, LinkError>;
1014
1015// ============================================================================
1016// Link Connection Trait
1017// ============================================================================
1018
1019/// A boxed future for async operations.
1020pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
1021
1022/// A boxed stream for async iteration.
1023pub type BoxStream<'a, T> = Pin<Box<dyn futures_util::Stream<Item = T> + Send + 'a>>;
1024
1025/// A connection to a remote peer.
1026///
1027/// This trait abstracts a single QUIC connection, providing methods to
1028/// open streams and send/receive datagrams. Connections are obtained via
1029/// [`LinkTransport::dial`] or [`LinkTransport::accept`].
1030///
1031/// # Stream Types
1032///
1033/// - **Bidirectional streams** (`open_bi`): Both endpoints can send and receive.
1034///   Use for request/response patterns.
1035/// - **Unidirectional streams** (`open_uni`): Only the opener can send.
1036///   Use for notifications or one-way data transfer.
1037/// - **Datagrams** (`send_datagram`): Unreliable, unordered messages.
1038///   Use for real-time data where latency > reliability.
1039///
1040/// # Connection Lifecycle
1041///
1042/// 1. Connection established (via dial or accept)
1043/// 2. Open streams as needed
1044/// 3. Close gracefully with `close()` or let it drop
1045pub trait LinkConn: Send + Sync {
1046    /// Get the remote peer's cryptographic identity.
1047    ///
1048    /// This is stable across reconnections and network changes.
1049    fn peer(&self) -> PeerId;
1050
1051    /// Get the remote peer's current network address.
1052    ///
1053    /// Note: This may change during the connection lifetime due to
1054    /// NAT rebinding or connection migration.
1055    fn remote_addr(&self) -> SocketAddr;
1056
1057    /// Open a unidirectional stream (send only).
1058    ///
1059    /// The remote peer will receive this stream via their `accept_uni()`.
1060    /// Use for one-way messages like notifications or log streams.
1061    ///
1062    /// # Example
1063    /// ```rust,ignore
1064    /// let mut stream = conn.open_uni().await?;
1065    /// stream.write_all(b"notification").await?;
1066    /// stream.finish()?; // Signal end of stream
1067    /// ```
1068    fn open_uni(&self) -> BoxFuture<'_, LinkResult<Box<dyn LinkSendStream>>>;
1069
1070    /// Open a bidirectional stream for request/response communication.
1071    ///
1072    /// Returns a (send, recv) pair. Both sides can write and read.
1073    /// Use for RPC, file transfers, or any interactive protocol.
1074    ///
1075    /// # Example
1076    /// ```rust,ignore
1077    /// let (mut send, mut recv) = conn.open_bi().await?;
1078    /// send.write_all(b"request").await?;
1079    /// send.finish()?;
1080    /// let response = recv.read_to_end(4096).await?;
1081    /// ```
1082    fn open_bi(
1083        &self,
1084    ) -> BoxFuture<'_, LinkResult<(Box<dyn LinkSendStream>, Box<dyn LinkRecvStream>)>>;
1085
1086    /// Open a typed unidirectional stream.
1087    ///
1088    /// The stream type byte is automatically prepended to the stream.
1089    /// The remote peer should use `accept_uni_typed` to receive.
1090    ///
1091    /// # Example
1092    /// ```rust,ignore
1093    /// let mut stream = conn.open_uni_typed(StreamType::Membership).await?;
1094    /// stream.write_all(b"membership update").await?;
1095    /// stream.finish()?;
1096    /// ```
1097    fn open_uni_typed(
1098        &self,
1099        stream_type: StreamType,
1100    ) -> BoxFuture<'_, LinkResult<Box<dyn LinkSendStream>>>;
1101
1102    /// Open a typed bidirectional stream.
1103    ///
1104    /// The stream type byte is automatically prepended to the stream.
1105    /// The remote peer should use `accept_bi_typed` to receive.
1106    ///
1107    /// # Example
1108    /// ```rust,ignore
1109    /// let (mut send, mut recv) = conn.open_bi_typed(StreamType::DhtQuery).await?;
1110    /// send.write_all(b"query request").await?;
1111    /// send.finish()?;
1112    /// let response = recv.read_to_end(4096).await?;
1113    /// ```
1114    fn open_bi_typed(
1115        &self,
1116        stream_type: StreamType,
1117    ) -> BoxFuture<'_, LinkResult<(Box<dyn LinkSendStream>, Box<dyn LinkRecvStream>)>>;
1118
1119    /// Accept incoming unidirectional streams with type filtering.
1120    ///
1121    /// Returns a stream of (type, recv_stream) pairs for streams
1122    /// matching the filter. Use `StreamFilter::new()` to accept all types.
1123    ///
1124    /// # Example
1125    /// ```rust,ignore
1126    /// let filter = StreamFilter::gossip_only();
1127    /// let mut incoming = conn.accept_uni_typed(filter);
1128    /// while let Some(result) = incoming.next().await {
1129    ///     let (stream_type, recv) = result?;
1130    ///     println!("Got {} stream", stream_type);
1131    /// }
1132    /// ```
1133    fn accept_uni_typed(
1134        &self,
1135        filter: StreamFilter,
1136    ) -> BoxStream<'_, LinkResult<(StreamType, Box<dyn LinkRecvStream>)>>;
1137
1138    /// Accept incoming bidirectional streams with type filtering.
1139    ///
1140    /// Returns a stream of (type, send_stream, recv_stream) tuples for
1141    /// streams matching the filter. Use `StreamFilter::new()` to accept all types.
1142    ///
1143    /// # Example
1144    /// ```rust,ignore
1145    /// let filter = StreamFilter::dht_only();
1146    /// let mut incoming = conn.accept_bi_typed(filter);
1147    /// while let Some(result) = incoming.next().await {
1148    ///     let (stream_type, send, recv) = result?;
1149    ///     // Handle DHT request/response
1150    /// }
1151    /// ```
1152    fn accept_bi_typed(
1153        &self,
1154        filter: StreamFilter,
1155    ) -> BoxStream<'_, LinkResult<(StreamType, Box<dyn LinkSendStream>, Box<dyn LinkRecvStream>)>>;
1156
1157    /// Send an unreliable datagram to the peer.
1158    ///
1159    /// Datagrams are:
1160    /// - **Unreliable**: May be dropped without notification
1161    /// - **Unordered**: May arrive out of order
1162    /// - **Size-limited**: Must fit in a single QUIC packet (~1200 bytes)
1163    ///
1164    /// Use for heartbeats, metrics, or real-time data where occasional
1165    /// loss is acceptable.
1166    fn send_datagram(&self, data: Bytes) -> LinkResult<()>;
1167
1168    /// Receive datagrams from the peer.
1169    ///
1170    /// Returns a stream of datagrams. Each datagram is delivered as-is
1171    /// (no framing). The stream ends when the connection closes.
1172    fn recv_datagrams(&self) -> BoxStream<'_, Bytes>;
1173
1174    /// Close the connection gracefully.
1175    ///
1176    /// # Parameters
1177    /// - `error_code`: Application-defined error code (0 = normal close)
1178    /// - `reason`: Human-readable reason for debugging
1179    fn close(&self, error_code: u64, reason: &str);
1180
1181    /// Check if the connection is still open.
1182    ///
1183    /// Returns false after the connection has been closed (locally or remotely)
1184    /// or if a fatal error occurred.
1185    fn is_open(&self) -> bool;
1186
1187    /// Get current connection statistics.
1188    ///
1189    /// Useful for monitoring connection health and debugging performance.
1190    fn stats(&self) -> ConnectionStats;
1191}
1192
1193/// Statistics for a connection.
1194///
1195/// Updated in real-time as the connection handles data. Use for:
1196/// - Monitoring connection health
1197/// - Detecting congestion (high RTT, packet loss)
1198/// - Debugging performance issues
1199///
1200/// # Typical Values
1201///
1202/// | Metric | Good | Concerning | Critical |
1203/// |--------|------|------------|----------|
1204/// | RTT | <50ms | 50-200ms | >500ms |
1205/// | Packet loss | <0.1% | 0.1-1% | >5% |
1206///
1207/// # Example
1208/// ```rust,ignore
1209/// let stats = conn.stats();
1210/// if stats.rtt > Duration::from_millis(200) {
1211///     log::warn!("High latency: {:?}", stats.rtt);
1212/// }
1213/// if stats.packets_lost > stats.bytes_sent / 100 {
1214///     log::warn!("Significant packet loss detected");
1215/// }
1216/// ```
1217#[derive(Debug, Clone, Default)]
1218pub struct ConnectionStats {
1219    /// Total bytes sent on this connection (including retransmits).
1220    pub bytes_sent: u64,
1221    /// Total bytes received on this connection.
1222    pub bytes_received: u64,
1223    /// Current smoothed round-trip time estimate.
1224    /// Calculated using QUIC's RTT estimation algorithm.
1225    pub rtt: Duration,
1226    /// How long this connection has been established.
1227    pub connected_duration: Duration,
1228    /// Total number of streams opened (bidirectional + unidirectional).
1229    pub streams_opened: u64,
1230    /// Estimated packets lost during transmission.
1231    /// High values indicate congestion or poor network conditions.
1232    pub packets_lost: u64,
1233}
1234
1235/// A send stream for writing data to a peer.
1236pub trait LinkSendStream: Send + Sync {
1237    /// Write data to the stream.
1238    fn write<'a>(&'a mut self, data: &'a [u8]) -> BoxFuture<'a, LinkResult<usize>>;
1239
1240    /// Write all data to the stream.
1241    fn write_all<'a>(&'a mut self, data: &'a [u8]) -> BoxFuture<'a, LinkResult<()>>;
1242
1243    /// Finish the stream (signal end of data).
1244    fn finish(&mut self) -> LinkResult<()>;
1245
1246    /// Reset the stream with an error code.
1247    fn reset(&mut self, error_code: u64) -> LinkResult<()>;
1248
1249    /// Get the stream ID.
1250    fn id(&self) -> u64;
1251}
1252
1253/// A receive stream for reading data from a peer.
1254pub trait LinkRecvStream: Send + Sync {
1255    /// Read data from the stream.
1256    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> BoxFuture<'a, LinkResult<Option<usize>>>;
1257
1258    /// Read all data until the stream ends.
1259    fn read_to_end(&mut self, size_limit: usize) -> BoxFuture<'_, LinkResult<Vec<u8>>>;
1260
1261    /// Stop receiving data (signal we don't want more).
1262    fn stop(&mut self, error_code: u64) -> LinkResult<()>;
1263
1264    /// Get the stream ID.
1265    fn id(&self) -> u64;
1266}
1267
1268// ============================================================================
1269// Link Transport Trait
1270// ============================================================================
1271
1272/// Incoming connection stream.
1273pub type Incoming<C> = BoxStream<'static, LinkResult<C>>;
1274
1275/// The primary transport abstraction for overlay networks.
1276///
1277/// This trait provides everything an overlay needs to establish connections,
1278/// send/receive data, and monitor the transport layer.
1279///
1280/// # Implementation Notes
1281///
1282/// Implementors should:
1283/// - Handle NAT traversal transparently
1284/// - Maintain a peer table with capabilities
1285/// - Emit events for connection state changes
1286/// - Support protocol multiplexing
1287///
1288/// # Example Implementation
1289///
1290/// The default implementation wraps `P2pEndpoint`:
1291///
1292/// ```rust,ignore
1293/// let config = P2pConfig::builder()
1294///     .bind_addr("0.0.0.0:0".parse()?)
1295///     .build()?;
1296/// let endpoint = P2pEndpoint::new(config).await?;
1297/// let transport: Arc<dyn LinkTransport<Conn = P2pLinkConn>> = Arc::new(endpoint);
1298/// ```
1299pub trait LinkTransport: Send + Sync + 'static {
1300    /// The connection type returned by this transport.
1301    type Conn: LinkConn + 'static;
1302
1303    /// Get our local peer identity.
1304    ///
1305    /// This is our stable cryptographic identity, derived from our key pair.
1306    /// It remains constant across restarts and network changes.
1307    fn local_peer(&self) -> PeerId;
1308
1309    /// Get our externally observed address, if known.
1310    ///
1311    /// Returns the address other peers see when we connect to them.
1312    /// This is discovered via:
1313    /// - OBSERVED_ADDRESS frames from connected peers
1314    /// - NAT traversal address discovery
1315    ///
1316    /// Returns `None` if we haven't connected to any peers yet or
1317    /// if we're behind a symmetric NAT that changes our external port.
1318    fn external_address(&self) -> Option<SocketAddr>;
1319
1320    /// Get all known peers with their capabilities.
1321    ///
1322    /// Includes:
1323    /// - Currently connected peers (`caps.is_connected = true`)
1324    /// - Previously connected peers still in bootstrap cache
1325    /// - Peers learned from relay/coordination traffic
1326    ///
1327    /// Use `Capabilities::quality_score()` to rank peers for selection.
1328    fn peer_table(&self) -> Vec<(PeerId, Capabilities)>;
1329
1330    /// Get capabilities for a specific peer.
1331    ///
1332    /// Returns `None` if the peer is not known.
1333    fn peer_capabilities(&self, peer: &PeerId) -> Option<Capabilities>;
1334
1335    /// Subscribe to transport-level events.
1336    ///
1337    /// Events include peer connections/disconnections, address changes,
1338    /// and capability updates. Use for maintaining overlay state.
1339    ///
1340    /// Multiple subscribers are supported via broadcast channel.
1341    fn subscribe(&self) -> broadcast::Receiver<LinkEvent>;
1342
1343    /// Accept incoming connections for a specific protocol.
1344    ///
1345    /// Returns a stream of connections from peers that want to speak
1346    /// the specified protocol. Register your protocol first with
1347    /// `register_protocol()`.
1348    ///
1349    /// # Example
1350    /// ```rust,ignore
1351    /// let mut incoming = transport.accept(MY_PROTOCOL);
1352    /// while let Some(result) = incoming.next().await {
1353    ///     if let Ok(conn) = result {
1354    ///         tokio::spawn(handle_connection(conn));
1355    ///     }
1356    /// }
1357    /// ```
1358    fn accept(&self, proto: ProtocolId) -> Incoming<Self::Conn>;
1359
1360    /// Dial a peer by their PeerId (preferred method).
1361    ///
1362    /// Uses the peer table to find known addresses for this peer.
1363    /// NAT traversal is handled automatically - if direct connection
1364    /// fails, coordination and hole-punching are attempted.
1365    ///
1366    /// # Errors
1367    /// - `PeerNotFound`: Peer not in table (need to bootstrap)
1368    /// - `ConnectionFailed`: Network error (may be transient)
1369    /// - `Timeout`: NAT traversal timed out (retry may succeed)
1370    fn dial(&self, peer: PeerId, proto: ProtocolId) -> BoxFuture<'_, LinkResult<Self::Conn>>;
1371
1372    /// Dial a peer by direct address (for bootstrapping).
1373    ///
1374    /// Use when you don't know the peer's ID yet, such as when
1375    /// connecting to a known seed address to join the network.
1376    ///
1377    /// After connection, the peer's ID will be available via
1378    /// `conn.peer()`.
1379    fn dial_addr(
1380        &self,
1381        addr: SocketAddr,
1382        proto: ProtocolId,
1383    ) -> BoxFuture<'_, LinkResult<Self::Conn>>;
1384
1385    /// Get protocols we advertise as supported.
1386    fn supported_protocols(&self) -> Vec<ProtocolId>;
1387
1388    /// Register a protocol as supported.
1389    ///
1390    /// Call this before `accept()` to receive connections for the protocol.
1391    /// Registered protocols are advertised to connected peers.
1392    fn register_protocol(&self, proto: ProtocolId);
1393
1394    /// Unregister a protocol.
1395    ///
1396    /// Stops accepting new connections for this protocol. Existing
1397    /// connections are not affected.
1398    fn unregister_protocol(&self, proto: ProtocolId);
1399
1400    /// Check if we have an active connection to a peer.
1401    fn is_connected(&self, peer: &PeerId) -> bool;
1402
1403    /// Get the count of active connections.
1404    fn active_connections(&self) -> usize;
1405
1406    /// Gracefully shutdown the transport.
1407    ///
1408    /// Closes all connections, stops accepting new ones, and flushes
1409    /// the bootstrap cache to disk. Pending operations will complete
1410    /// or error.
1411    ///
1412    /// Call this before exiting to ensure clean shutdown.
1413    fn shutdown(&self) -> BoxFuture<'_, ()>;
1414}
1415
1416// ============================================================================
1417// P2pEndpoint Implementation
1418// ============================================================================
1419
1420// The implementation of LinkTransport for P2pEndpoint is in a separate file
1421// to keep this module focused on the trait definitions.
1422
1423#[cfg(test)]
1424mod tests {
1425    use super::*;
1426
1427    #[test]
1428    fn test_protocol_id_from_string() {
1429        let proto = ProtocolId::from("saorsa-dht/1.0");
1430        assert_eq!(&proto.0[..14], b"saorsa-dht/1.0");
1431        assert_eq!(proto.0[14], 0);
1432        assert_eq!(proto.0[15], 0);
1433    }
1434
1435    #[test]
1436    fn test_protocol_id_truncation() {
1437        let proto = ProtocolId::from("this-is-a-very-long-protocol-name");
1438        assert_eq!(&proto.0, b"this-is-a-very-l");
1439    }
1440
1441    #[test]
1442    fn test_protocol_id_display() {
1443        let proto = ProtocolId::from("test/1.0");
1444        assert_eq!(format!("{}", proto), "test/1.0");
1445    }
1446
1447    #[test]
1448    fn test_capabilities_quality_score() {
1449        let mut caps = Capabilities::default();
1450
1451        // Default has perfect RTT (0ms) and no packet loss, so score should be high
1452        // Score = 0.5 (base) + 0.3 (RTT: 1.0*0.3) + 0.2 (loss: 1.0*0.2) = 1.0
1453        let base_score = caps.quality_score();
1454        assert!(
1455            (0.9..=1.0).contains(&base_score),
1456            "base_score = {}",
1457            base_score
1458        );
1459
1460        // Worse RTT should reduce score
1461        caps.rtt_ms_p50 = 150; // 50% of max
1462        let worse_rtt_score = caps.quality_score();
1463        assert!(
1464            worse_rtt_score < base_score,
1465            "worse RTT should reduce score"
1466        );
1467
1468        // Very bad RTT should reduce score more
1469        caps.rtt_ms_p50 = 500;
1470        let bad_rtt_score = caps.quality_score();
1471        assert!(
1472            bad_rtt_score < worse_rtt_score,
1473            "bad RTT should reduce score more"
1474        );
1475
1476        // Symmetric NAT should reduce score
1477        caps.rtt_ms_p50 = 50;
1478        caps.nat_type_hint = Some(NatHint::Symmetric);
1479        let nat_score = caps.quality_score();
1480        // Reset RTT for fair comparison
1481        caps.nat_type_hint = None;
1482        caps.rtt_ms_p50 = 50;
1483        let no_nat_score = caps.quality_score();
1484        assert!(
1485            nat_score < no_nat_score,
1486            "symmetric NAT should reduce score"
1487        );
1488    }
1489
1490    #[test]
1491    fn test_capabilities_supports_protocol() {
1492        let mut caps = Capabilities::default();
1493        let dht = ProtocolId::from("dht/1.0");
1494        let gossip = ProtocolId::from("gossip/1.0");
1495
1496        caps.protocols.push(dht);
1497
1498        assert!(caps.supports_protocol(&dht));
1499        assert!(!caps.supports_protocol(&gossip));
1500    }
1501
1502    // =========================================================================
1503    // Stream Type Tests
1504    // =========================================================================
1505
1506    #[test]
1507    fn test_stream_type_bytes() {
1508        assert_eq!(StreamType::Membership.as_byte(), 0x00);
1509        assert_eq!(StreamType::PubSub.as_byte(), 0x01);
1510        assert_eq!(StreamType::GossipBulk.as_byte(), 0x02);
1511        assert_eq!(StreamType::DhtQuery.as_byte(), 0x10);
1512        assert_eq!(StreamType::DhtStore.as_byte(), 0x11);
1513        assert_eq!(StreamType::DhtWitness.as_byte(), 0x12);
1514        assert_eq!(StreamType::DhtReplication.as_byte(), 0x13);
1515        assert_eq!(StreamType::WebRtcSignal.as_byte(), 0x20);
1516        assert_eq!(StreamType::WebRtcMedia.as_byte(), 0x21);
1517        assert_eq!(StreamType::WebRtcData.as_byte(), 0x22);
1518        assert_eq!(StreamType::Reserved.as_byte(), 0xF0);
1519    }
1520
1521    #[test]
1522    fn test_stream_type_from_byte() {
1523        assert_eq!(StreamType::from_byte(0x00), Some(StreamType::Membership));
1524        assert_eq!(StreamType::from_byte(0x10), Some(StreamType::DhtQuery));
1525        assert_eq!(StreamType::from_byte(0x20), Some(StreamType::WebRtcSignal));
1526        assert_eq!(StreamType::from_byte(0xF0), Some(StreamType::Reserved));
1527        assert_eq!(StreamType::from_byte(0x99), None); // Unassigned
1528        assert_eq!(StreamType::from_byte(0xFF), None); // Unassigned
1529    }
1530
1531    #[test]
1532    fn test_stream_type_families() {
1533        assert!(StreamType::Membership.is_gossip());
1534        assert!(StreamType::PubSub.is_gossip());
1535        assert!(StreamType::GossipBulk.is_gossip());
1536
1537        assert!(StreamType::DhtQuery.is_dht());
1538        assert!(StreamType::DhtStore.is_dht());
1539        assert!(StreamType::DhtWitness.is_dht());
1540        assert!(StreamType::DhtReplication.is_dht());
1541
1542        assert!(StreamType::WebRtcSignal.is_webrtc());
1543        assert!(StreamType::WebRtcMedia.is_webrtc());
1544        assert!(StreamType::WebRtcData.is_webrtc());
1545    }
1546
1547    #[test]
1548    fn test_stream_type_family_ranges() {
1549        assert!(StreamTypeFamily::Gossip.contains(0x00));
1550        assert!(StreamTypeFamily::Gossip.contains(0x0F));
1551        assert!(!StreamTypeFamily::Gossip.contains(0x10));
1552
1553        assert!(StreamTypeFamily::Dht.contains(0x10));
1554        assert!(StreamTypeFamily::Dht.contains(0x1F));
1555        assert!(!StreamTypeFamily::Dht.contains(0x20));
1556
1557        assert!(StreamTypeFamily::WebRtc.contains(0x20));
1558        assert!(StreamTypeFamily::WebRtc.contains(0x2F));
1559        assert!(!StreamTypeFamily::WebRtc.contains(0x30));
1560    }
1561
1562    #[test]
1563    fn test_stream_filter_accepts() {
1564        let filter = StreamFilter::new()
1565            .with_type(StreamType::Membership)
1566            .with_type(StreamType::DhtQuery);
1567
1568        assert!(filter.accepts(StreamType::Membership));
1569        assert!(filter.accepts(StreamType::DhtQuery));
1570        assert!(!filter.accepts(StreamType::PubSub));
1571        assert!(!filter.accepts(StreamType::WebRtcMedia));
1572    }
1573
1574    #[test]
1575    fn test_stream_filter_empty_accepts_all() {
1576        let filter = StreamFilter::new();
1577        assert!(filter.accepts_all());
1578        assert!(filter.accepts(StreamType::Membership));
1579        assert!(filter.accepts(StreamType::DhtQuery));
1580        assert!(filter.accepts(StreamType::WebRtcMedia));
1581    }
1582
1583    #[test]
1584    fn test_stream_filter_presets() {
1585        let gossip = StreamFilter::gossip_only();
1586        assert!(gossip.accepts(StreamType::Membership));
1587        assert!(gossip.accepts(StreamType::PubSub));
1588        assert!(gossip.accepts(StreamType::GossipBulk));
1589        assert!(!gossip.accepts(StreamType::DhtQuery));
1590
1591        let dht = StreamFilter::dht_only();
1592        assert!(dht.accepts(StreamType::DhtQuery));
1593        assert!(dht.accepts(StreamType::DhtStore));
1594        assert!(!dht.accepts(StreamType::Membership));
1595
1596        let webrtc = StreamFilter::webrtc_only();
1597        assert!(webrtc.accepts(StreamType::WebRtcSignal));
1598        assert!(webrtc.accepts(StreamType::WebRtcMedia));
1599        assert!(!webrtc.accepts(StreamType::DhtQuery));
1600    }
1601
1602    #[test]
1603    fn test_stream_type_display() {
1604        assert_eq!(format!("{}", StreamType::Membership), "Membership");
1605        assert_eq!(format!("{}", StreamType::DhtQuery), "DhtQuery");
1606        assert_eq!(format!("{}", StreamType::WebRtcMedia), "WebRtcMedia");
1607    }
1608
1609    // =========================================================================
1610    // Phase 1: ProtocolHandler Tests (TDD RED)
1611    // =========================================================================
1612
1613    mod protocol_handler_tests {
1614        use super::*;
1615        use std::sync::Arc;
1616        use std::sync::atomic::{AtomicUsize, Ordering};
1617
1618        /// Test handler implementation for testing
1619        struct TestHandler {
1620            types: Vec<StreamType>,
1621            call_count: Arc<AtomicUsize>,
1622        }
1623
1624        impl TestHandler {
1625            fn new(types: Vec<StreamType>) -> Self {
1626                Self {
1627                    types,
1628                    call_count: Arc::new(AtomicUsize::new(0)),
1629                }
1630            }
1631
1632            fn with_counter(types: Vec<StreamType>, counter: Arc<AtomicUsize>) -> Self {
1633                Self {
1634                    types,
1635                    call_count: counter,
1636                }
1637            }
1638        }
1639
1640        #[async_trait]
1641        impl ProtocolHandler for TestHandler {
1642            fn stream_types(&self) -> &[StreamType] {
1643                &self.types
1644            }
1645
1646            async fn handle_stream(
1647                &self,
1648                _peer: PeerId,
1649                _stream_type: StreamType,
1650                data: Bytes,
1651            ) -> LinkResult<Option<Bytes>> {
1652                self.call_count.fetch_add(1, Ordering::SeqCst);
1653                Ok(Some(data)) // Echo back
1654            }
1655
1656            fn name(&self) -> &str {
1657                "TestHandler"
1658            }
1659        }
1660
1661        #[test]
1662        fn test_handler_stream_types() {
1663            let handler = TestHandler::new(vec![StreamType::Membership, StreamType::PubSub]);
1664            assert_eq!(handler.stream_types().len(), 2);
1665            assert!(handler.stream_types().contains(&StreamType::Membership));
1666            assert!(handler.stream_types().contains(&StreamType::PubSub));
1667        }
1668
1669        #[tokio::test]
1670        async fn test_handler_returns_response() {
1671            let handler = TestHandler::new(vec![StreamType::DhtQuery]);
1672            let peer = PeerId::from([0u8; 32]);
1673
1674            let result = handler
1675                .handle_stream(peer, StreamType::DhtQuery, Bytes::from_static(b"test"))
1676                .await;
1677
1678            assert!(result.is_ok());
1679            assert_eq!(result.unwrap(), Some(Bytes::from_static(b"test")));
1680        }
1681
1682        #[tokio::test]
1683        async fn test_handler_no_response() {
1684            struct SinkHandler;
1685
1686            #[async_trait]
1687            impl ProtocolHandler for SinkHandler {
1688                fn stream_types(&self) -> &[StreamType] {
1689                    &[StreamType::GossipBulk]
1690                }
1691
1692                async fn handle_stream(
1693                    &self,
1694                    _peer: PeerId,
1695                    _stream_type: StreamType,
1696                    _data: Bytes,
1697                ) -> LinkResult<Option<Bytes>> {
1698                    Ok(None)
1699                }
1700            }
1701
1702            let handler = SinkHandler;
1703            let peer = PeerId::from([0u8; 32]);
1704
1705            let result = handler
1706                .handle_stream(peer, StreamType::GossipBulk, Bytes::from_static(b"data"))
1707                .await;
1708
1709            assert!(result.is_ok());
1710            assert!(result.unwrap().is_none());
1711        }
1712
1713        #[tokio::test]
1714        async fn test_handler_tracks_calls() {
1715            let count = Arc::new(AtomicUsize::new(0));
1716            let handler = TestHandler::with_counter(vec![StreamType::Membership], count.clone());
1717            let peer = PeerId::from([0u8; 32]);
1718
1719            assert_eq!(handler.name(), "TestHandler");
1720            assert_eq!(count.load(Ordering::SeqCst), 0);
1721
1722            let _ = handler
1723                .handle_stream(peer, StreamType::Membership, Bytes::new())
1724                .await;
1725            assert_eq!(count.load(Ordering::SeqCst), 1);
1726
1727            let _ = handler
1728                .handle_stream(peer, StreamType::Membership, Bytes::new())
1729                .await;
1730            assert_eq!(count.load(Ordering::SeqCst), 2);
1731        }
1732
1733        #[test]
1734        fn test_boxed_handler() {
1735            let handler: BoxedHandler = TestHandler::new(vec![StreamType::DhtStore]).boxed();
1736            assert_eq!(handler.stream_types(), &[StreamType::DhtStore]);
1737            assert_eq!(handler.name(), "TestHandler");
1738        }
1739
1740        #[tokio::test]
1741        async fn test_default_datagram_handler() {
1742            let handler = TestHandler::new(vec![StreamType::Membership]);
1743            let peer = PeerId::from([0u8; 32]);
1744
1745            // Default implementation should succeed silently
1746            let result = handler
1747                .handle_datagram(peer, StreamType::Membership, Bytes::from_static(b"dgram"))
1748                .await;
1749            assert!(result.is_ok());
1750        }
1751
1752        #[tokio::test]
1753        async fn test_default_shutdown() {
1754            let handler = TestHandler::new(vec![StreamType::Membership]);
1755
1756            // Default shutdown implementation should succeed
1757            let result = handler.shutdown().await;
1758            assert!(result.is_ok());
1759        }
1760    }
1761
1762    // =========================================================================
1763    // Phase 2: Handler Error Tests (TDD RED)
1764    // =========================================================================
1765
1766    mod handler_error_tests {
1767        use super::*;
1768
1769        #[test]
1770        fn test_handler_exists_error() {
1771            let err = LinkError::HandlerExists(StreamType::Membership);
1772            let msg = err.to_string();
1773            assert!(msg.contains("Membership"), "Error message: {}", msg);
1774            assert!(
1775                msg.to_lowercase().contains("handler"),
1776                "Error message: {}",
1777                msg
1778            );
1779        }
1780
1781        #[test]
1782        fn test_no_handler_error() {
1783            let err = LinkError::NoHandler(StreamType::DhtQuery);
1784            let msg = err.to_string();
1785            assert!(msg.contains("DhtQuery"), "Error message: {}", msg);
1786        }
1787
1788        #[test]
1789        fn test_not_running_error() {
1790            let err = LinkError::NotRunning;
1791            let msg = err.to_string();
1792            assert!(
1793                msg.to_lowercase().contains("not running"),
1794                "Error message: {}",
1795                msg
1796            );
1797        }
1798
1799        #[test]
1800        fn test_already_running_error() {
1801            let err = LinkError::AlreadyRunning;
1802            let msg = err.to_string();
1803            assert!(
1804                msg.to_lowercase().contains("already running"),
1805                "Error message: {}",
1806                msg
1807            );
1808        }
1809    }
1810}