ant_quic/link_transport.rs
1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! # Link Transport Abstraction Layer
9//!
10//! This module provides the [`LinkTransport`] and [`LinkConn`] traits that abstract
11//! the transport layer for overlay networks like saorsa-core. This enables:
12//!
13//! - **Version decoupling**: Overlays can compile against a stable trait interface
14//! while ant-quic evolves underneath
15//! - **Testing**: Mock transports for unit testing overlay logic
16//! - **Alternative transports**: Future support for WebRTC, TCP fallback, etc.
17//!
18//! ## Architecture
19//!
20//! ```text
21//! ┌─────────────────────────────────────────────────────────────────┐
22//! │ saorsa-core (overlay) │
23//! │ DHT routing │ Record storage │ Greedy routing │ Naming │
24//! └─────────────────────────────────────────────────────────────────┘
25//! │
26//! ▼
27//! ┌─────────────────────────────────────────────────────────────────┐
28//! │ LinkTransport trait │
29//! │ local_peer() │ peer_table() │ dial() │ accept() │ subscribe() │
30//! └─────────────────────────────────────────────────────────────────┘
31//! │
32//! ▼
33//! ┌─────────────────────────────────────────────────────────────────┐
34//! │ ant-quic P2pEndpoint │
35//! │ QUIC transport │ NAT traversal │ PQC │ Connection management │
36//! └─────────────────────────────────────────────────────────────────┘
37//! ```
38//!
39//! ## Example: Implementing an Overlay
40//!
41//! ```rust,ignore
42//! use ant_quic::link_transport::{LinkTransport, LinkConn, LinkEvent, ProtocolId, LinkError};
43//! use std::sync::Arc;
44//! use futures_util::StreamExt;
45//!
46//! // Define your overlay's protocol identifier
47//! const DHT_PROTOCOL: ProtocolId = ProtocolId::from_static(b"saorsa-dht/1.0.0");
48//!
49//! async fn run_overlay<T: LinkTransport>(transport: Arc<T>) -> anyhow::Result<()> {
50//! // Register our protocol so peers know we support it
51//! transport.register_protocol(DHT_PROTOCOL);
52//!
53//! // Subscribe to transport events for connection lifecycle
54//! let mut events = transport.subscribe();
55//! tokio::spawn(async move {
56//! while let Ok(event) = events.recv().await {
57//! match event {
58//! LinkEvent::PeerConnected { peer, caps } => {
59//! println!("New peer: {:?}, relay: {}", peer, caps.supports_relay);
60//! }
61//! LinkEvent::PeerDisconnected { peer, reason } => {
62//! println!("Lost peer: {:?}, reason: {:?}", peer, reason);
63//! }
64//! _ => {}
65//! }
66//! }
67//! });
68//!
69//! // Accept incoming connections in a background task
70//! let transport_clone = transport.clone();
71//! tokio::spawn(async move {
72//! let mut incoming = transport_clone.accept(DHT_PROTOCOL);
73//! while let Some(result) = incoming.next().await {
74//! match result {
75//! Ok(conn) => {
76//! println!("Accepted connection from {:?}", conn.peer());
77//! // Handle connection...
78//! }
79//! Err(e) => eprintln!("Accept error: {}", e),
80//! }
81//! }
82//! });
83//!
84//! // Dial a peer using their PeerId (NAT traversal handled automatically)
85//! let peers = transport.peer_table();
86//! if let Some((peer_id, caps)) = peers.first() {
87//! match transport.dial(*peer_id, DHT_PROTOCOL).await {
88//! Ok(conn) => {
89//! // Open a bidirectional stream for request/response
90//! let (mut send, mut recv) = conn.open_bi().await?;
91//! send.write_all(b"PING").await?;
92//! send.finish()?;
93//!
94//! let response = recv.read_to_end(1024).await?;
95//! println!("Response: {:?}", response);
96//! }
97//! Err(LinkError::PeerNotFound(_)) => {
98//! println!("Peer not in table - need to bootstrap");
99//! }
100//! Err(e) => eprintln!("Dial failed: {}", e),
101//! }
102//! }
103//!
104//! Ok(())
105//! }
106//! ```
107//!
108//! ## Choosing Stream Types
109//!
110//! - **Bidirectional (`open_bi`)**: Use for request/response patterns where both
111//! sides send and receive. Example: RPC calls, file transfers with acknowledgment.
112//!
113//! - **Unidirectional (`open_uni`)**: Use for one-way messages where no response
114//! is needed. Example: event notifications, log streaming, pub/sub.
115//!
116//! - **Datagrams (`send_datagram`)**: Use for small, unreliable messages where
117//! latency matters more than reliability. Example: heartbeats, real-time metrics.
118//!
119//! ## Error Handling Patterns
120//!
121//! ```rust,ignore
122//! use ant_quic::link_transport::{LinkError, LinkResult};
123//!
124//! async fn connect_with_retry<T: LinkTransport>(
125//! transport: &T,
126//! peer: PeerId,
127//! proto: ProtocolId,
128//! ) -> LinkResult<T::Conn> {
129//! for attempt in 1..=3 {
130//! match transport.dial(peer, proto).await {
131//! Ok(conn) => return Ok(conn),
132//! Err(LinkError::PeerNotFound(_)) => {
133//! // Peer not in table - can't retry, need bootstrap
134//! return Err(LinkError::PeerNotFound(format!("{:?}", peer)));
135//! }
136//! Err(LinkError::ConnectionFailed(msg)) if attempt < 3 => {
137//! // Transient failure - retry after delay
138//! tokio::time::sleep(Duration::from_millis(100 * attempt as u64)).await;
139//! continue;
140//! }
141//! Err(LinkError::Timeout) if attempt < 3 => {
142//! // NAT traversal may need multiple attempts
143//! continue;
144//! }
145//! Err(e) => return Err(e),
146//! }
147//! }
148//! Err(LinkError::ConnectionFailed("max retries exceeded".into()))
149//! }
150//! ```
151
152use std::collections::HashSet;
153use std::fmt;
154use std::future::Future;
155use std::net::SocketAddr;
156use std::pin::Pin;
157use std::time::{Duration, SystemTime};
158
159use async_trait::async_trait;
160use bytes::Bytes;
161use serde::{Deserialize, Serialize};
162use thiserror::Error;
163use tokio::sync::broadcast;
164
165use crate::nat_traversal_api::PeerId;
166use crate::transport::TransportAddr;
167
168// ============================================================================
169// Stream Type Registry (Protocol Multiplexing)
170// ============================================================================
171
172/// Stream type identifier - the first byte of each QUIC stream.
173///
174/// This enum provides a hardcoded registry of protocol types for multiplexing
175/// multiple protocols over a single QUIC connection. Each stream's first byte
176/// identifies its protocol type.
177///
178/// # Protocol Ranges
179///
180/// | Range | Protocol Family | Types |
181/// |-------|-----------------|-------|
182/// | 0x00-0x0F | Gossip | Membership, PubSub, Bulk |
183/// | 0x10-0x1F | DHT | Query, Store, Witness, Replication |
184/// | 0x20-0x2F | WebRTC | Signal, Media, Data |
185/// | 0xF0-0xFF | Reserved | Future use |
186///
187/// # Example
188///
189/// ```rust
190/// use ant_quic::link_transport::StreamType;
191///
192/// // Check if a byte is a valid stream type
193/// let stream_type = StreamType::from_byte(0x10);
194/// assert_eq!(stream_type, Some(StreamType::DhtQuery));
195///
196/// // Get all gossip types
197/// for st in StreamType::gossip_types() {
198/// println!("Gossip type: {}", st);
199/// }
200/// ```
201#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
202#[repr(u8)]
203pub enum StreamType {
204 // =========================================================================
205 // Gossip Protocols (0x00-0x0F)
206 // =========================================================================
207 /// Membership protocol messages (HyParView, SWIM).
208 Membership = 0x00,
209
210 /// PubSub protocol messages (Plumtree).
211 PubSub = 0x01,
212
213 /// Bulk gossip data transfer (CRDT deltas, large payloads).
214 GossipBulk = 0x02,
215
216 // =========================================================================
217 // DHT Protocols (0x10-0x1F)
218 // =========================================================================
219 /// DHT query operations (GET, FIND_NODE, FIND_VALUE).
220 DhtQuery = 0x10,
221
222 /// DHT store operations (PUT, STORE).
223 DhtStore = 0x11,
224
225 /// DHT witness operations (Byzantine fault tolerance).
226 DhtWitness = 0x12,
227
228 /// DHT replication operations (background repair).
229 DhtReplication = 0x13,
230
231 // =========================================================================
232 // WebRTC Protocols (0x20-0x2F)
233 // =========================================================================
234 /// WebRTC signaling (SDP, ICE candidates via QUIC).
235 WebRtcSignal = 0x20,
236
237 /// WebRTC media streams (audio/video RTP).
238 WebRtcMedia = 0x21,
239
240 /// WebRTC data channels.
241 WebRtcData = 0x22,
242
243 // =========================================================================
244 // Reserved (0xF0-0xFF)
245 // =========================================================================
246 /// Reserved for future protocols.
247 Reserved = 0xF0,
248}
249
250impl StreamType {
251 /// Parse a stream type from its byte value.
252 ///
253 /// Returns `None` for unknown/unassigned values.
254 #[inline]
255 pub fn from_byte(byte: u8) -> Option<Self> {
256 match byte {
257 0x00 => Some(Self::Membership),
258 0x01 => Some(Self::PubSub),
259 0x02 => Some(Self::GossipBulk),
260 0x10 => Some(Self::DhtQuery),
261 0x11 => Some(Self::DhtStore),
262 0x12 => Some(Self::DhtWitness),
263 0x13 => Some(Self::DhtReplication),
264 0x20 => Some(Self::WebRtcSignal),
265 0x21 => Some(Self::WebRtcMedia),
266 0x22 => Some(Self::WebRtcData),
267 0xF0 => Some(Self::Reserved),
268 _ => None,
269 }
270 }
271
272 /// Get the byte value for this stream type.
273 #[inline]
274 pub const fn as_byte(self) -> u8 {
275 self as u8
276 }
277
278 /// Get the protocol family for this stream type.
279 #[inline]
280 pub const fn family(self) -> StreamTypeFamily {
281 match self as u8 {
282 0x00..=0x0F => StreamTypeFamily::Gossip,
283 0x10..=0x1F => StreamTypeFamily::Dht,
284 0x20..=0x2F => StreamTypeFamily::WebRtc,
285 _ => StreamTypeFamily::Reserved,
286 }
287 }
288
289 /// Check if this is a gossip protocol type.
290 #[inline]
291 pub const fn is_gossip(self) -> bool {
292 matches!(self.family(), StreamTypeFamily::Gossip)
293 }
294
295 /// Check if this is a DHT protocol type.
296 #[inline]
297 pub const fn is_dht(self) -> bool {
298 matches!(self.family(), StreamTypeFamily::Dht)
299 }
300
301 /// Check if this is a WebRTC protocol type.
302 #[inline]
303 pub const fn is_webrtc(self) -> bool {
304 matches!(self.family(), StreamTypeFamily::WebRtc)
305 }
306
307 /// Get all gossip stream types.
308 pub const fn gossip_types() -> &'static [StreamType] {
309 &[Self::Membership, Self::PubSub, Self::GossipBulk]
310 }
311
312 /// Get all DHT stream types.
313 pub const fn dht_types() -> &'static [StreamType] {
314 &[
315 Self::DhtQuery,
316 Self::DhtStore,
317 Self::DhtWitness,
318 Self::DhtReplication,
319 ]
320 }
321
322 /// Get all WebRTC stream types.
323 pub const fn webrtc_types() -> &'static [StreamType] {
324 &[Self::WebRtcSignal, Self::WebRtcMedia, Self::WebRtcData]
325 }
326
327 /// Get all defined stream types.
328 pub const fn all_types() -> &'static [StreamType] {
329 &[
330 Self::Membership,
331 Self::PubSub,
332 Self::GossipBulk,
333 Self::DhtQuery,
334 Self::DhtStore,
335 Self::DhtWitness,
336 Self::DhtReplication,
337 Self::WebRtcSignal,
338 Self::WebRtcMedia,
339 Self::WebRtcData,
340 Self::Reserved,
341 ]
342 }
343}
344
345impl fmt::Display for StreamType {
346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347 match self {
348 Self::Membership => write!(f, "Membership"),
349 Self::PubSub => write!(f, "PubSub"),
350 Self::GossipBulk => write!(f, "GossipBulk"),
351 Self::DhtQuery => write!(f, "DhtQuery"),
352 Self::DhtStore => write!(f, "DhtStore"),
353 Self::DhtWitness => write!(f, "DhtWitness"),
354 Self::DhtReplication => write!(f, "DhtReplication"),
355 Self::WebRtcSignal => write!(f, "WebRtcSignal"),
356 Self::WebRtcMedia => write!(f, "WebRtcMedia"),
357 Self::WebRtcData => write!(f, "WebRtcData"),
358 Self::Reserved => write!(f, "Reserved"),
359 }
360 }
361}
362
363impl From<StreamType> for u8 {
364 fn from(st: StreamType) -> Self {
365 st as u8
366 }
367}
368
369impl TryFrom<u8> for StreamType {
370 type Error = LinkError;
371
372 fn try_from(byte: u8) -> Result<Self, Self::Error> {
373 Self::from_byte(byte).ok_or(LinkError::InvalidStreamType(byte))
374 }
375}
376
377/// Protocol family for stream types.
378#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
379pub enum StreamTypeFamily {
380 /// Gossip protocols (0x00-0x0F).
381 Gossip,
382 /// DHT protocols (0x10-0x1F).
383 Dht,
384 /// WebRTC protocols (0x20-0x2F).
385 WebRtc,
386 /// Reserved (0xF0-0xFF).
387 Reserved,
388}
389
390impl StreamTypeFamily {
391 /// Get the byte range for this protocol family.
392 pub const fn byte_range(self) -> (u8, u8) {
393 match self {
394 Self::Gossip => (0x00, 0x0F),
395 Self::Dht => (0x10, 0x1F),
396 Self::WebRtc => (0x20, 0x2F),
397 Self::Reserved => (0xF0, 0xFF),
398 }
399 }
400
401 /// Check if a byte is in this family's range.
402 pub const fn contains(self, byte: u8) -> bool {
403 let (start, end) = self.byte_range();
404 byte >= start && byte <= end
405 }
406}
407
408/// A filter for accepting specific stream types.
409///
410/// Use this with `accept_bi_typed` and `accept_uni_typed` to filter
411/// incoming streams by protocol type.
412///
413/// # Example
414///
415/// ```rust
416/// use ant_quic::link_transport::{StreamFilter, StreamType};
417///
418/// // Accept only DHT streams
419/// let filter = StreamFilter::new()
420/// .with_types(StreamType::dht_types());
421///
422/// // Accept gossip and DHT
423/// let filter = StreamFilter::new()
424/// .with_type(StreamType::Membership)
425/// .with_type(StreamType::DhtQuery);
426/// ```
427#[derive(Debug, Clone, Default)]
428pub struct StreamFilter {
429 /// Allowed stream types. Empty means accept all.
430 allowed: HashSet<StreamType>,
431}
432
433impl StreamFilter {
434 /// Create a new empty filter (accepts all types).
435 pub fn new() -> Self {
436 Self::default()
437 }
438
439 /// Create a filter that accepts all stream types.
440 pub fn accept_all() -> Self {
441 let mut filter = Self::new();
442 for st in StreamType::all_types() {
443 filter.allowed.insert(*st);
444 }
445 filter
446 }
447
448 /// Create a filter for gossip streams only.
449 pub fn gossip_only() -> Self {
450 Self::new().with_types(StreamType::gossip_types())
451 }
452
453 /// Create a filter for DHT streams only.
454 pub fn dht_only() -> Self {
455 Self::new().with_types(StreamType::dht_types())
456 }
457
458 /// Create a filter for WebRTC streams only.
459 pub fn webrtc_only() -> Self {
460 Self::new().with_types(StreamType::webrtc_types())
461 }
462
463 /// Add a single stream type to the filter.
464 pub fn with_type(mut self, stream_type: StreamType) -> Self {
465 self.allowed.insert(stream_type);
466 self
467 }
468
469 /// Add multiple stream types to the filter.
470 pub fn with_types(mut self, stream_types: &[StreamType]) -> Self {
471 for st in stream_types {
472 self.allowed.insert(*st);
473 }
474 self
475 }
476
477 /// Check if a stream type is accepted by this filter.
478 pub fn accepts(&self, stream_type: StreamType) -> bool {
479 self.allowed.is_empty() || self.allowed.contains(&stream_type)
480 }
481
482 /// Check if this filter accepts any type (is empty).
483 pub fn accepts_all(&self) -> bool {
484 self.allowed.is_empty()
485 }
486
487 /// Get the set of allowed types.
488 pub fn allowed_types(&self) -> &HashSet<StreamType> {
489 &self.allowed
490 }
491}
492
493// ============================================================================
494// Protocol Identifier
495// ============================================================================
496
497/// Protocol identifier for multiplexing multiple overlays on a single transport.
498///
499/// Protocols are identified by a 16-byte value, allowing efficient binary comparison
500/// while supporting human-readable names during debugging.
501///
502/// # Examples
503///
504/// ```rust
505/// use ant_quic::link_transport::ProtocolId;
506///
507/// // From a static string (padded/truncated to 16 bytes)
508/// const DHT: ProtocolId = ProtocolId::from_static(b"saorsa-dht/1.0.0");
509///
510/// // From bytes
511/// let proto = ProtocolId::new([0x73, 0x61, 0x6f, 0x72, 0x73, 0x61, 0x2d, 0x64,
512/// 0x68, 0x74, 0x2f, 0x31, 0x2e, 0x30, 0x2e, 0x30]);
513/// ```
514#[derive(Clone, Copy, PartialEq, Eq, Hash)]
515pub struct ProtocolId(pub [u8; 16]);
516
517impl ProtocolId {
518 /// Create a new protocol ID from raw bytes.
519 #[inline]
520 pub const fn new(bytes: [u8; 16]) -> Self {
521 Self(bytes)
522 }
523
524 /// Create a protocol ID from a static byte string.
525 ///
526 /// The string is padded with zeros if shorter than 16 bytes,
527 /// or truncated if longer.
528 #[inline]
529 pub const fn from_static(s: &[u8]) -> Self {
530 let mut bytes = [0u8; 16];
531 let len = if s.len() < 16 { s.len() } else { 16 };
532 let mut i = 0;
533 while i < len {
534 bytes[i] = s[i];
535 i += 1;
536 }
537 Self(bytes)
538 }
539
540 /// Get the raw bytes of this protocol ID.
541 #[inline]
542 pub const fn as_bytes(&self) -> &[u8; 16] {
543 &self.0
544 }
545
546 /// The default protocol for connections without explicit protocol negotiation.
547 pub const DEFAULT: Self = Self::from_static(b"ant-quic/default");
548
549 /// Protocol ID for NAT traversal coordination messages.
550 pub const NAT_TRAVERSAL: Self = Self::from_static(b"ant-quic/nat");
551
552 /// Protocol ID for relay traffic.
553 pub const RELAY: Self = Self::from_static(b"ant-quic/relay");
554}
555
556impl Default for ProtocolId {
557 fn default() -> Self {
558 Self::DEFAULT
559 }
560}
561
562impl fmt::Debug for ProtocolId {
563 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
564 // Try to display as UTF-8 string, trimming null bytes
565 let end = self.0.iter().position(|&b| b == 0).unwrap_or(16);
566 if let Ok(s) = std::str::from_utf8(&self.0[..end]) {
567 write!(f, "ProtocolId({:?})", s)
568 } else {
569 write!(f, "ProtocolId({:?})", hex::encode(self.0))
570 }
571 }
572}
573
574impl fmt::Display for ProtocolId {
575 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
576 let end = self.0.iter().position(|&b| b == 0).unwrap_or(16);
577 if let Ok(s) = std::str::from_utf8(&self.0[..end]) {
578 write!(f, "{}", s)
579 } else {
580 write!(f, "{}", hex::encode(self.0))
581 }
582 }
583}
584
585impl From<&str> for ProtocolId {
586 fn from(s: &str) -> Self {
587 Self::from_static(s.as_bytes())
588 }
589}
590
591impl From<[u8; 16]> for ProtocolId {
592 fn from(bytes: [u8; 16]) -> Self {
593 Self(bytes)
594 }
595}
596
597// ============================================================================
598// Peer Capabilities
599// ============================================================================
600
601/// NAT type classification hint for connection strategy selection.
602#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
603pub enum NatHint {
604 /// No NAT detected (public IP, direct connectivity)
605 None,
606 /// Full cone NAT (easiest to traverse)
607 FullCone,
608 /// Address-restricted cone NAT
609 AddressRestrictedCone,
610 /// Port-restricted cone NAT
611 PortRestrictedCone,
612 /// Symmetric NAT (hardest to traverse, may require relay)
613 Symmetric,
614 /// Unknown NAT type
615 #[default]
616 Unknown,
617}
618
619/// Capabilities and quality metrics for a connected peer.
620///
621/// This struct captures both static capabilities (what the peer can do)
622/// and dynamic metrics (how well the peer is performing).
623#[derive(Debug, Clone)]
624pub struct Capabilities {
625 /// Whether this peer can relay traffic for NAT traversal.
626 pub supports_relay: bool,
627
628 /// Whether this peer can coordinate NAT hole-punching.
629 pub supports_coordination: bool,
630
631 /// Observed external addresses for this peer.
632 pub observed_addrs: Vec<SocketAddr>,
633
634 /// Protocols this peer advertises support for.
635 pub protocols: Vec<ProtocolId>,
636
637 /// Last time we successfully communicated with this peer.
638 pub last_seen: SystemTime,
639
640 /// Median round-trip time in milliseconds (p50).
641 pub rtt_ms_p50: u32,
642
643 /// Estimated RTT jitter in milliseconds.
644 pub rtt_jitter_ms: u32,
645
646 /// Packet loss rate (0.0 to 1.0).
647 pub packet_loss: f32,
648
649 /// Inferred NAT type for connection strategy hints.
650 pub nat_type_hint: Option<NatHint>,
651
652 /// Peer's advertised bandwidth limit (bytes/sec), if any.
653 pub bandwidth_limit: Option<u64>,
654
655 /// Number of successful connections to this peer.
656 pub successful_connections: u32,
657
658 /// Number of failed connection attempts to this peer.
659 pub failed_connections: u32,
660
661 /// Whether this peer is currently connected.
662 pub is_connected: bool,
663}
664
665impl Default for Capabilities {
666 fn default() -> Self {
667 Self {
668 supports_relay: false,
669 supports_coordination: false,
670 observed_addrs: Vec::new(),
671 protocols: Vec::new(),
672 last_seen: SystemTime::UNIX_EPOCH,
673 rtt_ms_p50: 0,
674 rtt_jitter_ms: 0,
675 packet_loss: 0.0,
676 nat_type_hint: None,
677 bandwidth_limit: None,
678 successful_connections: 0,
679 failed_connections: 0,
680 is_connected: false,
681 }
682 }
683}
684
685impl Capabilities {
686 /// Create capabilities for a newly connected peer.
687 pub fn new_connected(addr: SocketAddr) -> Self {
688 Self {
689 observed_addrs: vec![addr],
690 last_seen: SystemTime::now(),
691 is_connected: true,
692 ..Default::default()
693 }
694 }
695
696 /// Calculate a quality score for peer selection (0.0 to 1.0).
697 ///
698 /// Higher scores indicate better peers for connection.
699 pub fn quality_score(&self) -> f32 {
700 let mut score = 0.5; // Base score
701
702 // RTT component (lower is better, max 300ms considered)
703 let rtt_score = 1.0 - (self.rtt_ms_p50 as f32 / 300.0).min(1.0);
704 score += rtt_score * 0.3;
705
706 // Packet loss component
707 let loss_score = 1.0 - self.packet_loss;
708 score += loss_score * 0.2;
709
710 // Connection success rate
711 let total = self.successful_connections + self.failed_connections;
712 if total > 0 {
713 let success_rate = self.successful_connections as f32 / total as f32;
714 score += success_rate * 0.2;
715 }
716
717 // Capability bonus
718 if self.supports_relay {
719 score += 0.05;
720 }
721 if self.supports_coordination {
722 score += 0.05;
723 }
724
725 // NAT type penalty
726 if let Some(nat) = self.nat_type_hint {
727 match nat {
728 NatHint::None | NatHint::FullCone => {}
729 NatHint::AddressRestrictedCone | NatHint::PortRestrictedCone => {
730 score -= 0.05;
731 }
732 NatHint::Symmetric => {
733 score -= 0.15;
734 }
735 NatHint::Unknown => {
736 score -= 0.02;
737 }
738 }
739 }
740
741 score.clamp(0.0, 1.0)
742 }
743
744 /// Check if this peer supports a specific protocol.
745 pub fn supports_protocol(&self, proto: &ProtocolId) -> bool {
746 self.protocols.contains(proto)
747 }
748}
749
750// ============================================================================
751// Link Events
752// ============================================================================
753
754/// Reason for peer disconnection.
755#[derive(Debug, Clone, PartialEq, Eq)]
756pub enum DisconnectReason {
757 /// Clean shutdown initiated by local side.
758 LocalClose,
759 /// Clean shutdown initiated by remote side.
760 RemoteClose,
761 /// Connection timed out.
762 Timeout,
763 /// Transport error occurred.
764 TransportError(String),
765 /// Application-level error code.
766 ApplicationError(u64),
767 /// Connection was reset.
768 Reset,
769}
770
771/// Events emitted by the link transport layer.
772///
773/// These events notify the overlay about significant transport-level changes.
774#[derive(Debug, Clone)]
775pub enum LinkEvent {
776 /// A new peer has connected.
777 PeerConnected {
778 /// The connected peer's ID.
779 peer: PeerId,
780 /// Initial capabilities (may be updated later).
781 caps: Capabilities,
782 },
783
784 /// A peer has disconnected.
785 PeerDisconnected {
786 /// The disconnected peer's ID.
787 peer: PeerId,
788 /// Reason for disconnection.
789 reason: DisconnectReason,
790 },
791
792 /// Our observed external address has been updated.
793 ExternalAddressUpdated {
794 /// The new external address (supports all transport types).
795 addr: TransportAddr,
796 },
797
798 /// A peer's capabilities have been updated.
799 CapabilityUpdated {
800 /// The peer whose capabilities changed.
801 peer: PeerId,
802 /// Updated capabilities.
803 caps: Capabilities,
804 },
805
806 /// A relay request has been received.
807 RelayRequest {
808 /// Peer requesting the relay.
809 from: PeerId,
810 /// Target peer for the relay.
811 to: PeerId,
812 /// Bytes remaining in relay budget.
813 budget_bytes: u64,
814 },
815
816 /// A NAT traversal coordination request has been received.
817 CoordinationRequest {
818 /// First peer in the coordination.
819 peer_a: PeerId,
820 /// Second peer in the coordination.
821 peer_b: PeerId,
822 /// Coordination round number.
823 round: u64,
824 },
825
826 /// The bootstrap cache has been updated.
827 BootstrapCacheUpdated {
828 /// Number of peers in the cache.
829 peer_count: usize,
830 },
831}
832
833// ============================================================================
834// Protocol Handler Abstraction
835// ============================================================================
836
837/// Handler for specific protocol stream types.
838///
839/// Implement this trait to handle incoming streams by protocol type.
840/// Each handler declares which [`StreamType`]s it processes and receives
841/// matching streams via [`Self::handle_stream`].
842///
843/// # Example
844///
845/// ```rust,ignore
846/// use ant_quic::link_transport::{ProtocolHandler, StreamType, LinkResult, PeerId};
847/// use async_trait::async_trait;
848/// use bytes::Bytes;
849///
850/// struct GossipHandler;
851///
852/// #[async_trait]
853/// impl ProtocolHandler for GossipHandler {
854/// fn stream_types(&self) -> &[StreamType] {
855/// StreamType::gossip_types()
856/// }
857///
858/// async fn handle_stream(
859/// &self,
860/// peer: PeerId,
861/// stream_type: StreamType,
862/// data: Bytes,
863/// ) -> LinkResult<Option<Bytes>> {
864/// // Process incoming gossip message, optionally return response
865/// Ok(None)
866/// }
867/// }
868/// ```
869#[async_trait]
870pub trait ProtocolHandler: Send + Sync {
871 /// Get the stream types this handler processes.
872 fn stream_types(&self) -> &[StreamType];
873
874 /// Handle an incoming stream.
875 ///
876 /// # Arguments
877 ///
878 /// * `peer` - The peer that sent the stream
879 /// * `stream_type` - The type of stream received
880 /// * `data` - The stream payload data
881 ///
882 /// # Returns
883 ///
884 /// * `Ok(Some(response))` - Send response back to the peer
885 /// * `Ok(None)` - No response (close stream gracefully)
886 /// * `Err(e)` - Handler error (stream closed with error)
887 async fn handle_stream(
888 &self,
889 peer: PeerId,
890 stream_type: StreamType,
891 data: Bytes,
892 ) -> LinkResult<Option<Bytes>>;
893
894 /// Handle an incoming datagram.
895 ///
896 /// Default implementation does nothing. Override for unreliable messaging.
897 async fn handle_datagram(
898 &self,
899 _peer: PeerId,
900 _stream_type: StreamType,
901 _data: Bytes,
902 ) -> LinkResult<()> {
903 Ok(())
904 }
905
906 /// Called when the handler is being shut down.
907 ///
908 /// Default implementation does nothing. Override for cleanup.
909 async fn shutdown(&self) -> LinkResult<()> {
910 Ok(())
911 }
912
913 /// Get a human-readable name for this handler.
914 ///
915 /// Used in logging and debugging.
916 fn name(&self) -> &str {
917 "ProtocolHandler"
918 }
919}
920
921/// A boxed protocol handler for dynamic dispatch.
922pub type BoxedHandler = Box<dyn ProtocolHandler>;
923
924/// Extension trait for creating boxed handlers.
925pub trait ProtocolHandlerExt: ProtocolHandler + Sized + 'static {
926 /// Box this handler for use with [`crate::SharedTransport`].
927 fn boxed(self) -> BoxedHandler {
928 Box::new(self)
929 }
930}
931
932impl<T: ProtocolHandler + 'static> ProtocolHandlerExt for T {}
933
934// ============================================================================
935// Link Transport Errors
936// ============================================================================
937
938/// Errors that can occur during link transport operations.
939#[derive(Debug, Error, Clone)]
940pub enum LinkError {
941 /// The connection was closed.
942 #[error("connection closed")]
943 ConnectionClosed,
944
945 /// Failed to establish connection.
946 #[error("connection failed: {0}")]
947 ConnectionFailed(String),
948
949 /// The peer is not known/reachable.
950 #[error("peer not found: {0}")]
951 PeerNotFound(String),
952
953 /// Protocol negotiation failed.
954 #[error("protocol not supported: {0}")]
955 ProtocolNotSupported(ProtocolId),
956
957 /// A timeout occurred.
958 #[error("operation timed out")]
959 Timeout,
960
961 /// The stream was reset by the peer.
962 #[error("stream reset: error code {0}")]
963 StreamReset(u64),
964
965 /// An I/O error occurred.
966 #[error("I/O error: {0}")]
967 Io(String),
968
969 /// The transport is shutting down.
970 #[error("transport shutdown")]
971 Shutdown,
972
973 /// Rate limit exceeded.
974 #[error("rate limit exceeded")]
975 RateLimited,
976
977 /// Internal error.
978 #[error("internal error: {0}")]
979 Internal(String),
980
981 /// Invalid stream type byte.
982 #[error("invalid stream type byte: 0x{0:02x}")]
983 InvalidStreamType(u8),
984
985 /// Stream type not accepted by filter.
986 #[error("stream type {0} not accepted")]
987 StreamTypeFiltered(StreamType),
988
989 /// Handler already registered for stream type.
990 #[error("handler already exists for stream type: {0}")]
991 HandlerExists(StreamType),
992
993 /// No handler registered for stream type.
994 #[error("no handler for stream type: {0}")]
995 NoHandler(StreamType),
996
997 /// Transport not running.
998 #[error("transport not running")]
999 NotRunning,
1000
1001 /// Transport already running.
1002 #[error("transport already running")]
1003 AlreadyRunning,
1004}
1005
1006impl From<std::io::Error> for LinkError {
1007 fn from(e: std::io::Error) -> Self {
1008 Self::Io(e.to_string())
1009 }
1010}
1011
1012/// Result type for link transport operations.
1013pub type LinkResult<T> = Result<T, LinkError>;
1014
1015// ============================================================================
1016// Link Connection Trait
1017// ============================================================================
1018
1019/// A boxed future for async operations.
1020pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
1021
1022/// A boxed stream for async iteration.
1023pub type BoxStream<'a, T> = Pin<Box<dyn futures_util::Stream<Item = T> + Send + 'a>>;
1024
1025/// A connection to a remote peer.
1026///
1027/// This trait abstracts a single QUIC connection, providing methods to
1028/// open streams and send/receive datagrams. Connections are obtained via
1029/// [`LinkTransport::dial`] or [`LinkTransport::accept`].
1030///
1031/// # Stream Types
1032///
1033/// - **Bidirectional streams** (`open_bi`): Both endpoints can send and receive.
1034/// Use for request/response patterns.
1035/// - **Unidirectional streams** (`open_uni`): Only the opener can send.
1036/// Use for notifications or one-way data transfer.
1037/// - **Datagrams** (`send_datagram`): Unreliable, unordered messages.
1038/// Use for real-time data where latency > reliability.
1039///
1040/// # Connection Lifecycle
1041///
1042/// 1. Connection established (via dial or accept)
1043/// 2. Open streams as needed
1044/// 3. Close gracefully with `close()` or let it drop
1045pub trait LinkConn: Send + Sync {
1046 /// Get the remote peer's cryptographic identity.
1047 ///
1048 /// This is stable across reconnections and network changes.
1049 fn peer(&self) -> PeerId;
1050
1051 /// Get the remote peer's current network address.
1052 ///
1053 /// Note: This may change during the connection lifetime due to
1054 /// NAT rebinding or connection migration.
1055 fn remote_addr(&self) -> SocketAddr;
1056
1057 /// Open a unidirectional stream (send only).
1058 ///
1059 /// The remote peer will receive this stream via their `accept_uni()`.
1060 /// Use for one-way messages like notifications or log streams.
1061 ///
1062 /// # Example
1063 /// ```rust,ignore
1064 /// let mut stream = conn.open_uni().await?;
1065 /// stream.write_all(b"notification").await?;
1066 /// stream.finish()?; // Signal end of stream
1067 /// ```
1068 fn open_uni(&self) -> BoxFuture<'_, LinkResult<Box<dyn LinkSendStream>>>;
1069
1070 /// Open a bidirectional stream for request/response communication.
1071 ///
1072 /// Returns a (send, recv) pair. Both sides can write and read.
1073 /// Use for RPC, file transfers, or any interactive protocol.
1074 ///
1075 /// # Example
1076 /// ```rust,ignore
1077 /// let (mut send, mut recv) = conn.open_bi().await?;
1078 /// send.write_all(b"request").await?;
1079 /// send.finish()?;
1080 /// let response = recv.read_to_end(4096).await?;
1081 /// ```
1082 fn open_bi(
1083 &self,
1084 ) -> BoxFuture<'_, LinkResult<(Box<dyn LinkSendStream>, Box<dyn LinkRecvStream>)>>;
1085
1086 /// Open a typed unidirectional stream.
1087 ///
1088 /// The stream type byte is automatically prepended to the stream.
1089 /// The remote peer should use `accept_uni_typed` to receive.
1090 ///
1091 /// # Example
1092 /// ```rust,ignore
1093 /// let mut stream = conn.open_uni_typed(StreamType::Membership).await?;
1094 /// stream.write_all(b"membership update").await?;
1095 /// stream.finish()?;
1096 /// ```
1097 fn open_uni_typed(
1098 &self,
1099 stream_type: StreamType,
1100 ) -> BoxFuture<'_, LinkResult<Box<dyn LinkSendStream>>>;
1101
1102 /// Open a typed bidirectional stream.
1103 ///
1104 /// The stream type byte is automatically prepended to the stream.
1105 /// The remote peer should use `accept_bi_typed` to receive.
1106 ///
1107 /// # Example
1108 /// ```rust,ignore
1109 /// let (mut send, mut recv) = conn.open_bi_typed(StreamType::DhtQuery).await?;
1110 /// send.write_all(b"query request").await?;
1111 /// send.finish()?;
1112 /// let response = recv.read_to_end(4096).await?;
1113 /// ```
1114 fn open_bi_typed(
1115 &self,
1116 stream_type: StreamType,
1117 ) -> BoxFuture<'_, LinkResult<(Box<dyn LinkSendStream>, Box<dyn LinkRecvStream>)>>;
1118
1119 /// Accept incoming unidirectional streams with type filtering.
1120 ///
1121 /// Returns a stream of (type, recv_stream) pairs for streams
1122 /// matching the filter. Use `StreamFilter::new()` to accept all types.
1123 ///
1124 /// # Example
1125 /// ```rust,ignore
1126 /// let filter = StreamFilter::gossip_only();
1127 /// let mut incoming = conn.accept_uni_typed(filter);
1128 /// while let Some(result) = incoming.next().await {
1129 /// let (stream_type, recv) = result?;
1130 /// println!("Got {} stream", stream_type);
1131 /// }
1132 /// ```
1133 fn accept_uni_typed(
1134 &self,
1135 filter: StreamFilter,
1136 ) -> BoxStream<'_, LinkResult<(StreamType, Box<dyn LinkRecvStream>)>>;
1137
1138 /// Accept incoming bidirectional streams with type filtering.
1139 ///
1140 /// Returns a stream of (type, send_stream, recv_stream) tuples for
1141 /// streams matching the filter. Use `StreamFilter::new()` to accept all types.
1142 ///
1143 /// # Example
1144 /// ```rust,ignore
1145 /// let filter = StreamFilter::dht_only();
1146 /// let mut incoming = conn.accept_bi_typed(filter);
1147 /// while let Some(result) = incoming.next().await {
1148 /// let (stream_type, send, recv) = result?;
1149 /// // Handle DHT request/response
1150 /// }
1151 /// ```
1152 fn accept_bi_typed(
1153 &self,
1154 filter: StreamFilter,
1155 ) -> BoxStream<'_, LinkResult<(StreamType, Box<dyn LinkSendStream>, Box<dyn LinkRecvStream>)>>;
1156
1157 /// Send an unreliable datagram to the peer.
1158 ///
1159 /// Datagrams are:
1160 /// - **Unreliable**: May be dropped without notification
1161 /// - **Unordered**: May arrive out of order
1162 /// - **Size-limited**: Must fit in a single QUIC packet (~1200 bytes)
1163 ///
1164 /// Use for heartbeats, metrics, or real-time data where occasional
1165 /// loss is acceptable.
1166 fn send_datagram(&self, data: Bytes) -> LinkResult<()>;
1167
1168 /// Receive datagrams from the peer.
1169 ///
1170 /// Returns a stream of datagrams. Each datagram is delivered as-is
1171 /// (no framing). The stream ends when the connection closes.
1172 fn recv_datagrams(&self) -> BoxStream<'_, Bytes>;
1173
1174 /// Close the connection gracefully.
1175 ///
1176 /// # Parameters
1177 /// - `error_code`: Application-defined error code (0 = normal close)
1178 /// - `reason`: Human-readable reason for debugging
1179 fn close(&self, error_code: u64, reason: &str);
1180
1181 /// Check if the connection is still open.
1182 ///
1183 /// Returns false after the connection has been closed (locally or remotely)
1184 /// or if a fatal error occurred.
1185 fn is_open(&self) -> bool;
1186
1187 /// Get current connection statistics.
1188 ///
1189 /// Useful for monitoring connection health and debugging performance.
1190 fn stats(&self) -> ConnectionStats;
1191}
1192
1193/// Statistics for a connection.
1194///
1195/// Updated in real-time as the connection handles data. Use for:
1196/// - Monitoring connection health
1197/// - Detecting congestion (high RTT, packet loss)
1198/// - Debugging performance issues
1199///
1200/// # Typical Values
1201///
1202/// | Metric | Good | Concerning | Critical |
1203/// |--------|------|------------|----------|
1204/// | RTT | <50ms | 50-200ms | >500ms |
1205/// | Packet loss | <0.1% | 0.1-1% | >5% |
1206///
1207/// # Example
1208/// ```rust,ignore
1209/// let stats = conn.stats();
1210/// if stats.rtt > Duration::from_millis(200) {
1211/// log::warn!("High latency: {:?}", stats.rtt);
1212/// }
1213/// if stats.packets_lost > stats.bytes_sent / 100 {
1214/// log::warn!("Significant packet loss detected");
1215/// }
1216/// ```
1217#[derive(Debug, Clone, Default)]
1218pub struct ConnectionStats {
1219 /// Total bytes sent on this connection (including retransmits).
1220 pub bytes_sent: u64,
1221 /// Total bytes received on this connection.
1222 pub bytes_received: u64,
1223 /// Current smoothed round-trip time estimate.
1224 /// Calculated using QUIC's RTT estimation algorithm.
1225 pub rtt: Duration,
1226 /// How long this connection has been established.
1227 pub connected_duration: Duration,
1228 /// Total number of streams opened (bidirectional + unidirectional).
1229 pub streams_opened: u64,
1230 /// Estimated packets lost during transmission.
1231 /// High values indicate congestion or poor network conditions.
1232 pub packets_lost: u64,
1233}
1234
1235/// A send stream for writing data to a peer.
1236pub trait LinkSendStream: Send + Sync {
1237 /// Write data to the stream.
1238 fn write<'a>(&'a mut self, data: &'a [u8]) -> BoxFuture<'a, LinkResult<usize>>;
1239
1240 /// Write all data to the stream.
1241 fn write_all<'a>(&'a mut self, data: &'a [u8]) -> BoxFuture<'a, LinkResult<()>>;
1242
1243 /// Finish the stream (signal end of data).
1244 fn finish(&mut self) -> LinkResult<()>;
1245
1246 /// Reset the stream with an error code.
1247 fn reset(&mut self, error_code: u64) -> LinkResult<()>;
1248
1249 /// Get the stream ID.
1250 fn id(&self) -> u64;
1251}
1252
1253/// A receive stream for reading data from a peer.
1254pub trait LinkRecvStream: Send + Sync {
1255 /// Read data from the stream.
1256 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> BoxFuture<'a, LinkResult<Option<usize>>>;
1257
1258 /// Read all data until the stream ends.
1259 fn read_to_end(&mut self, size_limit: usize) -> BoxFuture<'_, LinkResult<Vec<u8>>>;
1260
1261 /// Stop receiving data (signal we don't want more).
1262 fn stop(&mut self, error_code: u64) -> LinkResult<()>;
1263
1264 /// Get the stream ID.
1265 fn id(&self) -> u64;
1266}
1267
1268// ============================================================================
1269// Link Transport Trait
1270// ============================================================================
1271
1272/// Incoming connection stream.
1273pub type Incoming<C> = BoxStream<'static, LinkResult<C>>;
1274
1275/// The primary transport abstraction for overlay networks.
1276///
1277/// This trait provides everything an overlay needs to establish connections,
1278/// send/receive data, and monitor the transport layer.
1279///
1280/// # Implementation Notes
1281///
1282/// Implementors should:
1283/// - Handle NAT traversal transparently
1284/// - Maintain a peer table with capabilities
1285/// - Emit events for connection state changes
1286/// - Support protocol multiplexing
1287///
1288/// # Example Implementation
1289///
1290/// The default implementation wraps `P2pEndpoint`:
1291///
1292/// ```rust,ignore
1293/// let config = P2pConfig::builder()
1294/// .bind_addr("0.0.0.0:0".parse()?)
1295/// .build()?;
1296/// let endpoint = P2pEndpoint::new(config).await?;
1297/// let transport: Arc<dyn LinkTransport<Conn = P2pLinkConn>> = Arc::new(endpoint);
1298/// ```
1299pub trait LinkTransport: Send + Sync + 'static {
1300 /// The connection type returned by this transport.
1301 type Conn: LinkConn + 'static;
1302
1303 /// Get our local peer identity.
1304 ///
1305 /// This is our stable cryptographic identity, derived from our key pair.
1306 /// It remains constant across restarts and network changes.
1307 fn local_peer(&self) -> PeerId;
1308
1309 /// Get our externally observed address, if known.
1310 ///
1311 /// Returns the address other peers see when we connect to them.
1312 /// This is discovered via:
1313 /// - OBSERVED_ADDRESS frames from connected peers
1314 /// - NAT traversal address discovery
1315 ///
1316 /// Returns `None` if we haven't connected to any peers yet or
1317 /// if we're behind a symmetric NAT that changes our external port.
1318 fn external_address(&self) -> Option<SocketAddr>;
1319
1320 /// Get all known peers with their capabilities.
1321 ///
1322 /// Includes:
1323 /// - Currently connected peers (`caps.is_connected = true`)
1324 /// - Previously connected peers still in bootstrap cache
1325 /// - Peers learned from relay/coordination traffic
1326 ///
1327 /// Use `Capabilities::quality_score()` to rank peers for selection.
1328 fn peer_table(&self) -> Vec<(PeerId, Capabilities)>;
1329
1330 /// Get capabilities for a specific peer.
1331 ///
1332 /// Returns `None` if the peer is not known.
1333 fn peer_capabilities(&self, peer: &PeerId) -> Option<Capabilities>;
1334
1335 /// Subscribe to transport-level events.
1336 ///
1337 /// Events include peer connections/disconnections, address changes,
1338 /// and capability updates. Use for maintaining overlay state.
1339 ///
1340 /// Multiple subscribers are supported via broadcast channel.
1341 fn subscribe(&self) -> broadcast::Receiver<LinkEvent>;
1342
1343 /// Accept incoming connections for a specific protocol.
1344 ///
1345 /// Returns a stream of connections from peers that want to speak
1346 /// the specified protocol. Register your protocol first with
1347 /// `register_protocol()`.
1348 ///
1349 /// # Example
1350 /// ```rust,ignore
1351 /// let mut incoming = transport.accept(MY_PROTOCOL);
1352 /// while let Some(result) = incoming.next().await {
1353 /// if let Ok(conn) = result {
1354 /// tokio::spawn(handle_connection(conn));
1355 /// }
1356 /// }
1357 /// ```
1358 fn accept(&self, proto: ProtocolId) -> Incoming<Self::Conn>;
1359
1360 /// Dial a peer by their PeerId (preferred method).
1361 ///
1362 /// Uses the peer table to find known addresses for this peer.
1363 /// NAT traversal is handled automatically - if direct connection
1364 /// fails, coordination and hole-punching are attempted.
1365 ///
1366 /// # Errors
1367 /// - `PeerNotFound`: Peer not in table (need to bootstrap)
1368 /// - `ConnectionFailed`: Network error (may be transient)
1369 /// - `Timeout`: NAT traversal timed out (retry may succeed)
1370 fn dial(&self, peer: PeerId, proto: ProtocolId) -> BoxFuture<'_, LinkResult<Self::Conn>>;
1371
1372 /// Dial a peer by direct address (for bootstrapping).
1373 ///
1374 /// Use when you don't know the peer's ID yet, such as when
1375 /// connecting to a known seed address to join the network.
1376 ///
1377 /// After connection, the peer's ID will be available via
1378 /// `conn.peer()`.
1379 fn dial_addr(
1380 &self,
1381 addr: SocketAddr,
1382 proto: ProtocolId,
1383 ) -> BoxFuture<'_, LinkResult<Self::Conn>>;
1384
1385 /// Get protocols we advertise as supported.
1386 fn supported_protocols(&self) -> Vec<ProtocolId>;
1387
1388 /// Register a protocol as supported.
1389 ///
1390 /// Call this before `accept()` to receive connections for the protocol.
1391 /// Registered protocols are advertised to connected peers.
1392 fn register_protocol(&self, proto: ProtocolId);
1393
1394 /// Unregister a protocol.
1395 ///
1396 /// Stops accepting new connections for this protocol. Existing
1397 /// connections are not affected.
1398 fn unregister_protocol(&self, proto: ProtocolId);
1399
1400 /// Check if we have an active connection to a peer.
1401 fn is_connected(&self, peer: &PeerId) -> bool;
1402
1403 /// Get the count of active connections.
1404 fn active_connections(&self) -> usize;
1405
1406 /// Gracefully shutdown the transport.
1407 ///
1408 /// Closes all connections, stops accepting new ones, and flushes
1409 /// the bootstrap cache to disk. Pending operations will complete
1410 /// or error.
1411 ///
1412 /// Call this before exiting to ensure clean shutdown.
1413 fn shutdown(&self) -> BoxFuture<'_, ()>;
1414}
1415
1416// ============================================================================
1417// P2pEndpoint Implementation
1418// ============================================================================
1419
1420// The implementation of LinkTransport for P2pEndpoint is in a separate file
1421// to keep this module focused on the trait definitions.
1422
1423#[cfg(test)]
1424mod tests {
1425 use super::*;
1426
1427 #[test]
1428 fn test_protocol_id_from_string() {
1429 let proto = ProtocolId::from("saorsa-dht/1.0");
1430 assert_eq!(&proto.0[..14], b"saorsa-dht/1.0");
1431 assert_eq!(proto.0[14], 0);
1432 assert_eq!(proto.0[15], 0);
1433 }
1434
1435 #[test]
1436 fn test_protocol_id_truncation() {
1437 let proto = ProtocolId::from("this-is-a-very-long-protocol-name");
1438 assert_eq!(&proto.0, b"this-is-a-very-l");
1439 }
1440
1441 #[test]
1442 fn test_protocol_id_display() {
1443 let proto = ProtocolId::from("test/1.0");
1444 assert_eq!(format!("{}", proto), "test/1.0");
1445 }
1446
1447 #[test]
1448 fn test_capabilities_quality_score() {
1449 let mut caps = Capabilities::default();
1450
1451 // Default has perfect RTT (0ms) and no packet loss, so score should be high
1452 // Score = 0.5 (base) + 0.3 (RTT: 1.0*0.3) + 0.2 (loss: 1.0*0.2) = 1.0
1453 let base_score = caps.quality_score();
1454 assert!(
1455 (0.9..=1.0).contains(&base_score),
1456 "base_score = {}",
1457 base_score
1458 );
1459
1460 // Worse RTT should reduce score
1461 caps.rtt_ms_p50 = 150; // 50% of max
1462 let worse_rtt_score = caps.quality_score();
1463 assert!(
1464 worse_rtt_score < base_score,
1465 "worse RTT should reduce score"
1466 );
1467
1468 // Very bad RTT should reduce score more
1469 caps.rtt_ms_p50 = 500;
1470 let bad_rtt_score = caps.quality_score();
1471 assert!(
1472 bad_rtt_score < worse_rtt_score,
1473 "bad RTT should reduce score more"
1474 );
1475
1476 // Symmetric NAT should reduce score
1477 caps.rtt_ms_p50 = 50;
1478 caps.nat_type_hint = Some(NatHint::Symmetric);
1479 let nat_score = caps.quality_score();
1480 // Reset RTT for fair comparison
1481 caps.nat_type_hint = None;
1482 caps.rtt_ms_p50 = 50;
1483 let no_nat_score = caps.quality_score();
1484 assert!(
1485 nat_score < no_nat_score,
1486 "symmetric NAT should reduce score"
1487 );
1488 }
1489
1490 #[test]
1491 fn test_capabilities_supports_protocol() {
1492 let mut caps = Capabilities::default();
1493 let dht = ProtocolId::from("dht/1.0");
1494 let gossip = ProtocolId::from("gossip/1.0");
1495
1496 caps.protocols.push(dht);
1497
1498 assert!(caps.supports_protocol(&dht));
1499 assert!(!caps.supports_protocol(&gossip));
1500 }
1501
1502 // =========================================================================
1503 // Stream Type Tests
1504 // =========================================================================
1505
1506 #[test]
1507 fn test_stream_type_bytes() {
1508 assert_eq!(StreamType::Membership.as_byte(), 0x00);
1509 assert_eq!(StreamType::PubSub.as_byte(), 0x01);
1510 assert_eq!(StreamType::GossipBulk.as_byte(), 0x02);
1511 assert_eq!(StreamType::DhtQuery.as_byte(), 0x10);
1512 assert_eq!(StreamType::DhtStore.as_byte(), 0x11);
1513 assert_eq!(StreamType::DhtWitness.as_byte(), 0x12);
1514 assert_eq!(StreamType::DhtReplication.as_byte(), 0x13);
1515 assert_eq!(StreamType::WebRtcSignal.as_byte(), 0x20);
1516 assert_eq!(StreamType::WebRtcMedia.as_byte(), 0x21);
1517 assert_eq!(StreamType::WebRtcData.as_byte(), 0x22);
1518 assert_eq!(StreamType::Reserved.as_byte(), 0xF0);
1519 }
1520
1521 #[test]
1522 fn test_stream_type_from_byte() {
1523 assert_eq!(StreamType::from_byte(0x00), Some(StreamType::Membership));
1524 assert_eq!(StreamType::from_byte(0x10), Some(StreamType::DhtQuery));
1525 assert_eq!(StreamType::from_byte(0x20), Some(StreamType::WebRtcSignal));
1526 assert_eq!(StreamType::from_byte(0xF0), Some(StreamType::Reserved));
1527 assert_eq!(StreamType::from_byte(0x99), None); // Unassigned
1528 assert_eq!(StreamType::from_byte(0xFF), None); // Unassigned
1529 }
1530
1531 #[test]
1532 fn test_stream_type_families() {
1533 assert!(StreamType::Membership.is_gossip());
1534 assert!(StreamType::PubSub.is_gossip());
1535 assert!(StreamType::GossipBulk.is_gossip());
1536
1537 assert!(StreamType::DhtQuery.is_dht());
1538 assert!(StreamType::DhtStore.is_dht());
1539 assert!(StreamType::DhtWitness.is_dht());
1540 assert!(StreamType::DhtReplication.is_dht());
1541
1542 assert!(StreamType::WebRtcSignal.is_webrtc());
1543 assert!(StreamType::WebRtcMedia.is_webrtc());
1544 assert!(StreamType::WebRtcData.is_webrtc());
1545 }
1546
1547 #[test]
1548 fn test_stream_type_family_ranges() {
1549 assert!(StreamTypeFamily::Gossip.contains(0x00));
1550 assert!(StreamTypeFamily::Gossip.contains(0x0F));
1551 assert!(!StreamTypeFamily::Gossip.contains(0x10));
1552
1553 assert!(StreamTypeFamily::Dht.contains(0x10));
1554 assert!(StreamTypeFamily::Dht.contains(0x1F));
1555 assert!(!StreamTypeFamily::Dht.contains(0x20));
1556
1557 assert!(StreamTypeFamily::WebRtc.contains(0x20));
1558 assert!(StreamTypeFamily::WebRtc.contains(0x2F));
1559 assert!(!StreamTypeFamily::WebRtc.contains(0x30));
1560 }
1561
1562 #[test]
1563 fn test_stream_filter_accepts() {
1564 let filter = StreamFilter::new()
1565 .with_type(StreamType::Membership)
1566 .with_type(StreamType::DhtQuery);
1567
1568 assert!(filter.accepts(StreamType::Membership));
1569 assert!(filter.accepts(StreamType::DhtQuery));
1570 assert!(!filter.accepts(StreamType::PubSub));
1571 assert!(!filter.accepts(StreamType::WebRtcMedia));
1572 }
1573
1574 #[test]
1575 fn test_stream_filter_empty_accepts_all() {
1576 let filter = StreamFilter::new();
1577 assert!(filter.accepts_all());
1578 assert!(filter.accepts(StreamType::Membership));
1579 assert!(filter.accepts(StreamType::DhtQuery));
1580 assert!(filter.accepts(StreamType::WebRtcMedia));
1581 }
1582
1583 #[test]
1584 fn test_stream_filter_presets() {
1585 let gossip = StreamFilter::gossip_only();
1586 assert!(gossip.accepts(StreamType::Membership));
1587 assert!(gossip.accepts(StreamType::PubSub));
1588 assert!(gossip.accepts(StreamType::GossipBulk));
1589 assert!(!gossip.accepts(StreamType::DhtQuery));
1590
1591 let dht = StreamFilter::dht_only();
1592 assert!(dht.accepts(StreamType::DhtQuery));
1593 assert!(dht.accepts(StreamType::DhtStore));
1594 assert!(!dht.accepts(StreamType::Membership));
1595
1596 let webrtc = StreamFilter::webrtc_only();
1597 assert!(webrtc.accepts(StreamType::WebRtcSignal));
1598 assert!(webrtc.accepts(StreamType::WebRtcMedia));
1599 assert!(!webrtc.accepts(StreamType::DhtQuery));
1600 }
1601
1602 #[test]
1603 fn test_stream_type_display() {
1604 assert_eq!(format!("{}", StreamType::Membership), "Membership");
1605 assert_eq!(format!("{}", StreamType::DhtQuery), "DhtQuery");
1606 assert_eq!(format!("{}", StreamType::WebRtcMedia), "WebRtcMedia");
1607 }
1608
1609 // =========================================================================
1610 // Phase 1: ProtocolHandler Tests (TDD RED)
1611 // =========================================================================
1612
1613 mod protocol_handler_tests {
1614 use super::*;
1615 use std::sync::Arc;
1616 use std::sync::atomic::{AtomicUsize, Ordering};
1617
1618 /// Test handler implementation for testing
1619 struct TestHandler {
1620 types: Vec<StreamType>,
1621 call_count: Arc<AtomicUsize>,
1622 }
1623
1624 impl TestHandler {
1625 fn new(types: Vec<StreamType>) -> Self {
1626 Self {
1627 types,
1628 call_count: Arc::new(AtomicUsize::new(0)),
1629 }
1630 }
1631
1632 fn with_counter(types: Vec<StreamType>, counter: Arc<AtomicUsize>) -> Self {
1633 Self {
1634 types,
1635 call_count: counter,
1636 }
1637 }
1638 }
1639
1640 #[async_trait]
1641 impl ProtocolHandler for TestHandler {
1642 fn stream_types(&self) -> &[StreamType] {
1643 &self.types
1644 }
1645
1646 async fn handle_stream(
1647 &self,
1648 _peer: PeerId,
1649 _stream_type: StreamType,
1650 data: Bytes,
1651 ) -> LinkResult<Option<Bytes>> {
1652 self.call_count.fetch_add(1, Ordering::SeqCst);
1653 Ok(Some(data)) // Echo back
1654 }
1655
1656 fn name(&self) -> &str {
1657 "TestHandler"
1658 }
1659 }
1660
1661 #[test]
1662 fn test_handler_stream_types() {
1663 let handler = TestHandler::new(vec![StreamType::Membership, StreamType::PubSub]);
1664 assert_eq!(handler.stream_types().len(), 2);
1665 assert!(handler.stream_types().contains(&StreamType::Membership));
1666 assert!(handler.stream_types().contains(&StreamType::PubSub));
1667 }
1668
1669 #[tokio::test]
1670 async fn test_handler_returns_response() {
1671 let handler = TestHandler::new(vec![StreamType::DhtQuery]);
1672 let peer = PeerId::from([0u8; 32]);
1673
1674 let result = handler
1675 .handle_stream(peer, StreamType::DhtQuery, Bytes::from_static(b"test"))
1676 .await;
1677
1678 assert!(result.is_ok());
1679 assert_eq!(result.unwrap(), Some(Bytes::from_static(b"test")));
1680 }
1681
1682 #[tokio::test]
1683 async fn test_handler_no_response() {
1684 struct SinkHandler;
1685
1686 #[async_trait]
1687 impl ProtocolHandler for SinkHandler {
1688 fn stream_types(&self) -> &[StreamType] {
1689 &[StreamType::GossipBulk]
1690 }
1691
1692 async fn handle_stream(
1693 &self,
1694 _peer: PeerId,
1695 _stream_type: StreamType,
1696 _data: Bytes,
1697 ) -> LinkResult<Option<Bytes>> {
1698 Ok(None)
1699 }
1700 }
1701
1702 let handler = SinkHandler;
1703 let peer = PeerId::from([0u8; 32]);
1704
1705 let result = handler
1706 .handle_stream(peer, StreamType::GossipBulk, Bytes::from_static(b"data"))
1707 .await;
1708
1709 assert!(result.is_ok());
1710 assert!(result.unwrap().is_none());
1711 }
1712
1713 #[tokio::test]
1714 async fn test_handler_tracks_calls() {
1715 let count = Arc::new(AtomicUsize::new(0));
1716 let handler = TestHandler::with_counter(vec![StreamType::Membership], count.clone());
1717 let peer = PeerId::from([0u8; 32]);
1718
1719 assert_eq!(handler.name(), "TestHandler");
1720 assert_eq!(count.load(Ordering::SeqCst), 0);
1721
1722 let _ = handler
1723 .handle_stream(peer, StreamType::Membership, Bytes::new())
1724 .await;
1725 assert_eq!(count.load(Ordering::SeqCst), 1);
1726
1727 let _ = handler
1728 .handle_stream(peer, StreamType::Membership, Bytes::new())
1729 .await;
1730 assert_eq!(count.load(Ordering::SeqCst), 2);
1731 }
1732
1733 #[test]
1734 fn test_boxed_handler() {
1735 let handler: BoxedHandler = TestHandler::new(vec![StreamType::DhtStore]).boxed();
1736 assert_eq!(handler.stream_types(), &[StreamType::DhtStore]);
1737 assert_eq!(handler.name(), "TestHandler");
1738 }
1739
1740 #[tokio::test]
1741 async fn test_default_datagram_handler() {
1742 let handler = TestHandler::new(vec![StreamType::Membership]);
1743 let peer = PeerId::from([0u8; 32]);
1744
1745 // Default implementation should succeed silently
1746 let result = handler
1747 .handle_datagram(peer, StreamType::Membership, Bytes::from_static(b"dgram"))
1748 .await;
1749 assert!(result.is_ok());
1750 }
1751
1752 #[tokio::test]
1753 async fn test_default_shutdown() {
1754 let handler = TestHandler::new(vec![StreamType::Membership]);
1755
1756 // Default shutdown implementation should succeed
1757 let result = handler.shutdown().await;
1758 assert!(result.is_ok());
1759 }
1760 }
1761
1762 // =========================================================================
1763 // Phase 2: Handler Error Tests (TDD RED)
1764 // =========================================================================
1765
1766 mod handler_error_tests {
1767 use super::*;
1768
1769 #[test]
1770 fn test_handler_exists_error() {
1771 let err = LinkError::HandlerExists(StreamType::Membership);
1772 let msg = err.to_string();
1773 assert!(msg.contains("Membership"), "Error message: {}", msg);
1774 assert!(
1775 msg.to_lowercase().contains("handler"),
1776 "Error message: {}",
1777 msg
1778 );
1779 }
1780
1781 #[test]
1782 fn test_no_handler_error() {
1783 let err = LinkError::NoHandler(StreamType::DhtQuery);
1784 let msg = err.to_string();
1785 assert!(msg.contains("DhtQuery"), "Error message: {}", msg);
1786 }
1787
1788 #[test]
1789 fn test_not_running_error() {
1790 let err = LinkError::NotRunning;
1791 let msg = err.to_string();
1792 assert!(
1793 msg.to_lowercase().contains("not running"),
1794 "Error message: {}",
1795 msg
1796 );
1797 }
1798
1799 #[test]
1800 fn test_already_running_error() {
1801 let err = LinkError::AlreadyRunning;
1802 let msg = err.to_string();
1803 assert!(
1804 msg.to_lowercase().contains("already running"),
1805 "Error message: {}",
1806 msg
1807 );
1808 }
1809 }
1810}