ant_quic/link_transport.rs
1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! # Link Transport Abstraction Layer
9//!
10//! This module provides the [`LinkTransport`] and [`LinkConn`] traits that abstract
11//! the transport layer for overlay networks like saorsa-core. This enables:
12//!
13//! - **Version decoupling**: Overlays can compile against a stable trait interface
14//! while ant-quic evolves underneath
15//! - **Testing**: Mock transports for unit testing overlay logic
16//! - **Alternative transports**: Future support for WebRTC, TCP fallback, etc.
17//!
18//! ## Architecture
19//!
20//! ```text
21//! ┌─────────────────────────────────────────────────────────────────┐
22//! │ saorsa-core (overlay) │
23//! │ DHT routing │ Record storage │ Greedy routing │ Naming │
24//! └─────────────────────────────────────────────────────────────────┘
25//! │
26//! ▼
27//! ┌─────────────────────────────────────────────────────────────────┐
28//! │ LinkTransport trait │
29//! │ local_peer() │ peer_table() │ dial() │ accept() │ subscribe() │
30//! └─────────────────────────────────────────────────────────────────┘
31//! │
32//! ▼
33//! ┌─────────────────────────────────────────────────────────────────┐
34//! │ ant-quic P2pEndpoint │
35//! │ QUIC transport │ NAT traversal │ PQC │ Connection management │
36//! └─────────────────────────────────────────────────────────────────┘
37//! ```
38//!
39//! ## Example: Implementing an Overlay
40//!
41//! ```rust,ignore
42//! use ant_quic::link_transport::{LinkTransport, LinkConn, LinkEvent, ProtocolId, LinkError};
43//! use std::sync::Arc;
44//! use futures_util::StreamExt;
45//!
46//! // Define your overlay's protocol identifier
47//! const DHT_PROTOCOL: ProtocolId = ProtocolId::from_static(b"saorsa-dht/1.0.0");
48//!
49//! async fn run_overlay<T: LinkTransport>(transport: Arc<T>) -> anyhow::Result<()> {
50//! // Register our protocol so peers know we support it
51//! transport.register_protocol(DHT_PROTOCOL);
52//!
53//! // Subscribe to transport events for connection lifecycle
54//! let mut events = transport.subscribe();
55//! tokio::spawn(async move {
56//! while let Ok(event) = events.recv().await {
57//! match event {
58//! LinkEvent::PeerConnected { peer, caps } => {
59//! println!("New peer: {:?}, relay: {}", peer, caps.supports_relay);
60//! }
61//! LinkEvent::PeerDisconnected { peer, reason } => {
62//! println!("Lost peer: {:?}, reason: {:?}", peer, reason);
63//! }
64//! _ => {}
65//! }
66//! }
67//! });
68//!
69//! // Accept incoming connections in a background task
70//! let transport_clone = transport.clone();
71//! tokio::spawn(async move {
72//! let mut incoming = transport_clone.accept(DHT_PROTOCOL);
73//! while let Some(result) = incoming.next().await {
74//! match result {
75//! Ok(conn) => {
76//! println!("Accepted connection from {:?}", conn.peer());
77//! // Handle connection...
78//! }
79//! Err(e) => eprintln!("Accept error: {}", e),
80//! }
81//! }
82//! });
83//!
84//! // Dial a peer using their PeerId (NAT traversal handled automatically)
85//! let peers = transport.peer_table();
86//! if let Some((peer_id, caps)) = peers.first() {
87//! match transport.dial(*peer_id, DHT_PROTOCOL).await {
88//! Ok(conn) => {
89//! // Open a bidirectional stream for request/response
90//! let (mut send, mut recv) = conn.open_bi().await?;
91//! send.write_all(b"PING").await?;
92//! send.finish()?;
93//!
94//! let response = recv.read_to_end(1024).await?;
95//! println!("Response: {:?}", response);
96//! }
97//! Err(LinkError::PeerNotFound(_)) => {
98//! println!("Peer not in table - need to bootstrap");
99//! }
100//! Err(e) => eprintln!("Dial failed: {}", e),
101//! }
102//! }
103//!
104//! Ok(())
105//! }
106//! ```
107//!
108//! ## Choosing Stream Types
109//!
110//! - **Bidirectional (`open_bi`)**: Use for request/response patterns where both
111//! sides send and receive. Example: RPC calls, file transfers with acknowledgment.
112//!
113//! - **Unidirectional (`open_uni`)**: Use for one-way messages where no response
114//! is needed. Example: event notifications, log streaming, pub/sub.
115//!
116//! - **Datagrams (`send_datagram`)**: Use for small, unreliable messages where
117//! latency matters more than reliability. Example: heartbeats, real-time metrics.
118//!
119//! ## Error Handling Patterns
120//!
121//! ```rust,ignore
122//! use ant_quic::link_transport::{LinkError, LinkResult};
123//!
124//! async fn connect_with_retry<T: LinkTransport>(
125//! transport: &T,
126//! peer: PeerId,
127//! proto: ProtocolId,
128//! ) -> LinkResult<T::Conn> {
129//! for attempt in 1..=3 {
130//! match transport.dial(peer, proto).await {
131//! Ok(conn) => return Ok(conn),
132//! Err(LinkError::PeerNotFound(_)) => {
133//! // Peer not in table - can't retry, need bootstrap
134//! return Err(LinkError::PeerNotFound(format!("{:?}", peer)));
135//! }
136//! Err(LinkError::ConnectionFailed(msg)) if attempt < 3 => {
137//! // Transient failure - retry after delay
138//! tokio::time::sleep(Duration::from_millis(100 * attempt as u64)).await;
139//! continue;
140//! }
141//! Err(LinkError::Timeout) if attempt < 3 => {
142//! // NAT traversal may need multiple attempts
143//! continue;
144//! }
145//! Err(e) => return Err(e),
146//! }
147//! }
148//! Err(LinkError::ConnectionFailed("max retries exceeded".into()))
149//! }
150//! ```
151
152use std::fmt;
153use std::future::Future;
154use std::net::SocketAddr;
155use std::pin::Pin;
156use std::sync::Arc;
157use std::time::{Duration, Instant, SystemTime};
158
159use bytes::Bytes;
160use thiserror::Error;
161use tokio::sync::broadcast;
162
163use crate::nat_traversal_api::PeerId;
164
165// ============================================================================
166// Protocol Identifier
167// ============================================================================
168
169/// Protocol identifier for multiplexing multiple overlays on a single transport.
170///
171/// Protocols are identified by a 16-byte value, allowing efficient binary comparison
172/// while supporting human-readable names during debugging.
173///
174/// # Examples
175///
176/// ```rust
177/// use ant_quic::link_transport::ProtocolId;
178///
179/// // From a static string (padded/truncated to 16 bytes)
180/// const DHT: ProtocolId = ProtocolId::from_static(b"saorsa-dht/1.0.0");
181///
182/// // From bytes
183/// let proto = ProtocolId::new([0x73, 0x61, 0x6f, 0x72, 0x73, 0x61, 0x2d, 0x64,
184/// 0x68, 0x74, 0x2f, 0x31, 0x2e, 0x30, 0x2e, 0x30]);
185/// ```
186#[derive(Clone, Copy, PartialEq, Eq, Hash)]
187pub struct ProtocolId(pub [u8; 16]);
188
189impl ProtocolId {
190 /// Create a new protocol ID from raw bytes.
191 #[inline]
192 pub const fn new(bytes: [u8; 16]) -> Self {
193 Self(bytes)
194 }
195
196 /// Create a protocol ID from a static byte string.
197 ///
198 /// The string is padded with zeros if shorter than 16 bytes,
199 /// or truncated if longer.
200 #[inline]
201 pub const fn from_static(s: &[u8]) -> Self {
202 let mut bytes = [0u8; 16];
203 let len = if s.len() < 16 { s.len() } else { 16 };
204 let mut i = 0;
205 while i < len {
206 bytes[i] = s[i];
207 i += 1;
208 }
209 Self(bytes)
210 }
211
212 /// Get the raw bytes of this protocol ID.
213 #[inline]
214 pub const fn as_bytes(&self) -> &[u8; 16] {
215 &self.0
216 }
217
218 /// The default protocol for connections without explicit protocol negotiation.
219 pub const DEFAULT: Self = Self::from_static(b"ant-quic/default");
220
221 /// Protocol ID for NAT traversal coordination messages.
222 pub const NAT_TRAVERSAL: Self = Self::from_static(b"ant-quic/nat");
223
224 /// Protocol ID for relay traffic.
225 pub const RELAY: Self = Self::from_static(b"ant-quic/relay");
226}
227
228impl Default for ProtocolId {
229 fn default() -> Self {
230 Self::DEFAULT
231 }
232}
233
234impl fmt::Debug for ProtocolId {
235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236 // Try to display as UTF-8 string, trimming null bytes
237 let end = self.0.iter().position(|&b| b == 0).unwrap_or(16);
238 if let Ok(s) = std::str::from_utf8(&self.0[..end]) {
239 write!(f, "ProtocolId({:?})", s)
240 } else {
241 write!(f, "ProtocolId({:?})", hex::encode(self.0))
242 }
243 }
244}
245
246impl fmt::Display for ProtocolId {
247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 let end = self.0.iter().position(|&b| b == 0).unwrap_or(16);
249 if let Ok(s) = std::str::from_utf8(&self.0[..end]) {
250 write!(f, "{}", s)
251 } else {
252 write!(f, "{}", hex::encode(self.0))
253 }
254 }
255}
256
257impl From<&str> for ProtocolId {
258 fn from(s: &str) -> Self {
259 Self::from_static(s.as_bytes())
260 }
261}
262
263impl From<[u8; 16]> for ProtocolId {
264 fn from(bytes: [u8; 16]) -> Self {
265 Self(bytes)
266 }
267}
268
269// ============================================================================
270// Peer Capabilities
271// ============================================================================
272
273/// NAT type classification hint for connection strategy selection.
274#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
275pub enum NatHint {
276 /// No NAT detected (public IP, direct connectivity)
277 None,
278 /// Full cone NAT (easiest to traverse)
279 FullCone,
280 /// Address-restricted cone NAT
281 AddressRestrictedCone,
282 /// Port-restricted cone NAT
283 PortRestrictedCone,
284 /// Symmetric NAT (hardest to traverse, may require relay)
285 Symmetric,
286 /// Unknown NAT type
287 #[default]
288 Unknown,
289}
290
291/// Capabilities and quality metrics for a connected peer.
292///
293/// This struct captures both static capabilities (what the peer can do)
294/// and dynamic metrics (how well the peer is performing).
295#[derive(Debug, Clone)]
296pub struct Capabilities {
297 /// Whether this peer can relay traffic for NAT traversal.
298 pub supports_relay: bool,
299
300 /// Whether this peer can coordinate NAT hole-punching.
301 pub supports_coordination: bool,
302
303 /// Observed external addresses for this peer.
304 pub observed_addrs: Vec<SocketAddr>,
305
306 /// Protocols this peer advertises support for.
307 pub protocols: Vec<ProtocolId>,
308
309 /// Last time we successfully communicated with this peer.
310 pub last_seen: SystemTime,
311
312 /// Median round-trip time in milliseconds (p50).
313 pub rtt_ms_p50: u32,
314
315 /// Estimated RTT jitter in milliseconds.
316 pub rtt_jitter_ms: u32,
317
318 /// Packet loss rate (0.0 to 1.0).
319 pub packet_loss: f32,
320
321 /// Inferred NAT type for connection strategy hints.
322 pub nat_type_hint: Option<NatHint>,
323
324 /// Peer's advertised bandwidth limit (bytes/sec), if any.
325 pub bandwidth_limit: Option<u64>,
326
327 /// Number of successful connections to this peer.
328 pub successful_connections: u32,
329
330 /// Number of failed connection attempts to this peer.
331 pub failed_connections: u32,
332
333 /// Whether this peer is currently connected.
334 pub is_connected: bool,
335}
336
337impl Default for Capabilities {
338 fn default() -> Self {
339 Self {
340 supports_relay: false,
341 supports_coordination: false,
342 observed_addrs: Vec::new(),
343 protocols: Vec::new(),
344 last_seen: SystemTime::UNIX_EPOCH,
345 rtt_ms_p50: 0,
346 rtt_jitter_ms: 0,
347 packet_loss: 0.0,
348 nat_type_hint: None,
349 bandwidth_limit: None,
350 successful_connections: 0,
351 failed_connections: 0,
352 is_connected: false,
353 }
354 }
355}
356
357impl Capabilities {
358 /// Create capabilities for a newly connected peer.
359 pub fn new_connected(addr: SocketAddr) -> Self {
360 Self {
361 observed_addrs: vec![addr],
362 last_seen: SystemTime::now(),
363 is_connected: true,
364 ..Default::default()
365 }
366 }
367
368 /// Calculate a quality score for peer selection (0.0 to 1.0).
369 ///
370 /// Higher scores indicate better peers for connection.
371 pub fn quality_score(&self) -> f32 {
372 let mut score = 0.5; // Base score
373
374 // RTT component (lower is better, max 300ms considered)
375 let rtt_score = 1.0 - (self.rtt_ms_p50 as f32 / 300.0).min(1.0);
376 score += rtt_score * 0.3;
377
378 // Packet loss component
379 let loss_score = 1.0 - self.packet_loss;
380 score += loss_score * 0.2;
381
382 // Connection success rate
383 let total = self.successful_connections + self.failed_connections;
384 if total > 0 {
385 let success_rate = self.successful_connections as f32 / total as f32;
386 score += success_rate * 0.2;
387 }
388
389 // Capability bonus
390 if self.supports_relay {
391 score += 0.05;
392 }
393 if self.supports_coordination {
394 score += 0.05;
395 }
396
397 // NAT type penalty
398 if let Some(nat) = self.nat_type_hint {
399 match nat {
400 NatHint::None | NatHint::FullCone => {}
401 NatHint::AddressRestrictedCone | NatHint::PortRestrictedCone => {
402 score -= 0.05;
403 }
404 NatHint::Symmetric => {
405 score -= 0.15;
406 }
407 NatHint::Unknown => {
408 score -= 0.02;
409 }
410 }
411 }
412
413 score.clamp(0.0, 1.0)
414 }
415
416 /// Check if this peer supports a specific protocol.
417 pub fn supports_protocol(&self, proto: &ProtocolId) -> bool {
418 self.protocols.contains(proto)
419 }
420}
421
422// ============================================================================
423// Link Events
424// ============================================================================
425
426/// Reason for peer disconnection.
427#[derive(Debug, Clone, PartialEq, Eq)]
428pub enum DisconnectReason {
429 /// Clean shutdown initiated by local side.
430 LocalClose,
431 /// Clean shutdown initiated by remote side.
432 RemoteClose,
433 /// Connection timed out.
434 Timeout,
435 /// Transport error occurred.
436 TransportError(String),
437 /// Application-level error code.
438 ApplicationError(u64),
439 /// Connection was reset.
440 Reset,
441}
442
443/// Events emitted by the link transport layer.
444///
445/// These events notify the overlay about significant transport-level changes.
446#[derive(Debug, Clone)]
447pub enum LinkEvent {
448 /// A new peer has connected.
449 PeerConnected {
450 /// The connected peer's ID.
451 peer: PeerId,
452 /// Initial capabilities (may be updated later).
453 caps: Capabilities,
454 },
455
456 /// A peer has disconnected.
457 PeerDisconnected {
458 /// The disconnected peer's ID.
459 peer: PeerId,
460 /// Reason for disconnection.
461 reason: DisconnectReason,
462 },
463
464 /// Our observed external address has been updated.
465 ExternalAddressUpdated {
466 /// The new external address.
467 addr: SocketAddr,
468 },
469
470 /// A peer's capabilities have been updated.
471 CapabilityUpdated {
472 /// The peer whose capabilities changed.
473 peer: PeerId,
474 /// Updated capabilities.
475 caps: Capabilities,
476 },
477
478 /// A relay request has been received.
479 RelayRequest {
480 /// Peer requesting the relay.
481 from: PeerId,
482 /// Target peer for the relay.
483 to: PeerId,
484 /// Bytes remaining in relay budget.
485 budget_bytes: u64,
486 },
487
488 /// A NAT traversal coordination request has been received.
489 CoordinationRequest {
490 /// First peer in the coordination.
491 peer_a: PeerId,
492 /// Second peer in the coordination.
493 peer_b: PeerId,
494 /// Coordination round number.
495 round: u64,
496 },
497
498 /// The bootstrap cache has been updated.
499 BootstrapCacheUpdated {
500 /// Number of peers in the cache.
501 peer_count: usize,
502 },
503}
504
505// ============================================================================
506// Link Transport Errors
507// ============================================================================
508
509/// Errors that can occur during link transport operations.
510#[derive(Debug, Error, Clone)]
511pub enum LinkError {
512 /// The connection was closed.
513 #[error("connection closed")]
514 ConnectionClosed,
515
516 /// Failed to establish connection.
517 #[error("connection failed: {0}")]
518 ConnectionFailed(String),
519
520 /// The peer is not known/reachable.
521 #[error("peer not found: {0}")]
522 PeerNotFound(String),
523
524 /// Protocol negotiation failed.
525 #[error("protocol not supported: {0}")]
526 ProtocolNotSupported(ProtocolId),
527
528 /// A timeout occurred.
529 #[error("operation timed out")]
530 Timeout,
531
532 /// The stream was reset by the peer.
533 #[error("stream reset: error code {0}")]
534 StreamReset(u64),
535
536 /// An I/O error occurred.
537 #[error("I/O error: {0}")]
538 Io(String),
539
540 /// The transport is shutting down.
541 #[error("transport shutdown")]
542 Shutdown,
543
544 /// Rate limit exceeded.
545 #[error("rate limit exceeded")]
546 RateLimited,
547
548 /// Internal error.
549 #[error("internal error: {0}")]
550 Internal(String),
551}
552
553impl From<std::io::Error> for LinkError {
554 fn from(e: std::io::Error) -> Self {
555 Self::Io(e.to_string())
556 }
557}
558
559/// Result type for link transport operations.
560pub type LinkResult<T> = Result<T, LinkError>;
561
562// ============================================================================
563// Link Connection Trait
564// ============================================================================
565
566/// A boxed future for async operations.
567pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
568
569/// A boxed stream for async iteration.
570pub type BoxStream<'a, T> = Pin<Box<dyn futures_util::Stream<Item = T> + Send + 'a>>;
571
572/// A connection to a remote peer.
573///
574/// This trait abstracts a single QUIC connection, providing methods to
575/// open streams and send/receive datagrams. Connections are obtained via
576/// [`LinkTransport::dial`] or [`LinkTransport::accept`].
577///
578/// # Stream Types
579///
580/// - **Bidirectional streams** (`open_bi`): Both endpoints can send and receive.
581/// Use for request/response patterns.
582/// - **Unidirectional streams** (`open_uni`): Only the opener can send.
583/// Use for notifications or one-way data transfer.
584/// - **Datagrams** (`send_datagram`): Unreliable, unordered messages.
585/// Use for real-time data where latency > reliability.
586///
587/// # Connection Lifecycle
588///
589/// 1. Connection established (via dial or accept)
590/// 2. Open streams as needed
591/// 3. Close gracefully with `close()` or let it drop
592pub trait LinkConn: Send + Sync {
593 /// Get the remote peer's cryptographic identity.
594 ///
595 /// This is stable across reconnections and network changes.
596 fn peer(&self) -> PeerId;
597
598 /// Get the remote peer's current network address.
599 ///
600 /// Note: This may change during the connection lifetime due to
601 /// NAT rebinding or connection migration.
602 fn remote_addr(&self) -> SocketAddr;
603
604 /// Open a unidirectional stream (send only).
605 ///
606 /// The remote peer will receive this stream via their `accept_uni()`.
607 /// Use for one-way messages like notifications or log streams.
608 ///
609 /// # Example
610 /// ```rust,ignore
611 /// let mut stream = conn.open_uni().await?;
612 /// stream.write_all(b"notification").await?;
613 /// stream.finish()?; // Signal end of stream
614 /// ```
615 fn open_uni(&self) -> BoxFuture<'_, LinkResult<Box<dyn LinkSendStream>>>;
616
617 /// Open a bidirectional stream for request/response communication.
618 ///
619 /// Returns a (send, recv) pair. Both sides can write and read.
620 /// Use for RPC, file transfers, or any interactive protocol.
621 ///
622 /// # Example
623 /// ```rust,ignore
624 /// let (mut send, mut recv) = conn.open_bi().await?;
625 /// send.write_all(b"request").await?;
626 /// send.finish()?;
627 /// let response = recv.read_to_end(4096).await?;
628 /// ```
629 fn open_bi(
630 &self,
631 ) -> BoxFuture<'_, LinkResult<(Box<dyn LinkSendStream>, Box<dyn LinkRecvStream>)>>;
632
633 /// Send an unreliable datagram to the peer.
634 ///
635 /// Datagrams are:
636 /// - **Unreliable**: May be dropped without notification
637 /// - **Unordered**: May arrive out of order
638 /// - **Size-limited**: Must fit in a single QUIC packet (~1200 bytes)
639 ///
640 /// Use for heartbeats, metrics, or real-time data where occasional
641 /// loss is acceptable.
642 fn send_datagram(&self, data: Bytes) -> LinkResult<()>;
643
644 /// Receive datagrams from the peer.
645 ///
646 /// Returns a stream of datagrams. Each datagram is delivered as-is
647 /// (no framing). The stream ends when the connection closes.
648 fn recv_datagrams(&self) -> BoxStream<'_, Bytes>;
649
650 /// Close the connection gracefully.
651 ///
652 /// # Parameters
653 /// - `error_code`: Application-defined error code (0 = normal close)
654 /// - `reason`: Human-readable reason for debugging
655 fn close(&self, error_code: u64, reason: &str);
656
657 /// Check if the connection is still open.
658 ///
659 /// Returns false after the connection has been closed (locally or remotely)
660 /// or if a fatal error occurred.
661 fn is_open(&self) -> bool;
662
663 /// Get current connection statistics.
664 ///
665 /// Useful for monitoring connection health and debugging performance.
666 fn stats(&self) -> ConnectionStats;
667}
668
669/// Statistics for a connection.
670///
671/// Updated in real-time as the connection handles data. Use for:
672/// - Monitoring connection health
673/// - Detecting congestion (high RTT, packet loss)
674/// - Debugging performance issues
675///
676/// # Typical Values
677///
678/// | Metric | Good | Concerning | Critical |
679/// |--------|------|------------|----------|
680/// | RTT | <50ms | 50-200ms | >500ms |
681/// | Packet loss | <0.1% | 0.1-1% | >5% |
682///
683/// # Example
684/// ```rust,ignore
685/// let stats = conn.stats();
686/// if stats.rtt > Duration::from_millis(200) {
687/// log::warn!("High latency: {:?}", stats.rtt);
688/// }
689/// if stats.packets_lost > stats.bytes_sent / 100 {
690/// log::warn!("Significant packet loss detected");
691/// }
692/// ```
693#[derive(Debug, Clone, Default)]
694pub struct ConnectionStats {
695 /// Total bytes sent on this connection (including retransmits).
696 pub bytes_sent: u64,
697 /// Total bytes received on this connection.
698 pub bytes_received: u64,
699 /// Current smoothed round-trip time estimate.
700 /// Calculated using QUIC's RTT estimation algorithm.
701 pub rtt: Duration,
702 /// How long this connection has been established.
703 pub connected_duration: Duration,
704 /// Total number of streams opened (bidirectional + unidirectional).
705 pub streams_opened: u64,
706 /// Estimated packets lost during transmission.
707 /// High values indicate congestion or poor network conditions.
708 pub packets_lost: u64,
709}
710
711/// A send stream for writing data to a peer.
712pub trait LinkSendStream: Send + Sync {
713 /// Write data to the stream.
714 fn write<'a>(&'a mut self, data: &'a [u8]) -> BoxFuture<'a, LinkResult<usize>>;
715
716 /// Write all data to the stream.
717 fn write_all<'a>(&'a mut self, data: &'a [u8]) -> BoxFuture<'a, LinkResult<()>>;
718
719 /// Finish the stream (signal end of data).
720 fn finish(&mut self) -> LinkResult<()>;
721
722 /// Reset the stream with an error code.
723 fn reset(&mut self, error_code: u64) -> LinkResult<()>;
724
725 /// Get the stream ID.
726 fn id(&self) -> u64;
727}
728
729/// A receive stream for reading data from a peer.
730pub trait LinkRecvStream: Send + Sync {
731 /// Read data from the stream.
732 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> BoxFuture<'a, LinkResult<Option<usize>>>;
733
734 /// Read all data until the stream ends.
735 fn read_to_end(&mut self, size_limit: usize) -> BoxFuture<'_, LinkResult<Vec<u8>>>;
736
737 /// Stop receiving data (signal we don't want more).
738 fn stop(&mut self, error_code: u64) -> LinkResult<()>;
739
740 /// Get the stream ID.
741 fn id(&self) -> u64;
742}
743
744// ============================================================================
745// Link Transport Trait
746// ============================================================================
747
748/// Incoming connection stream.
749pub type Incoming<C> = BoxStream<'static, LinkResult<C>>;
750
751/// The primary transport abstraction for overlay networks.
752///
753/// This trait provides everything an overlay needs to establish connections,
754/// send/receive data, and monitor the transport layer.
755///
756/// # Implementation Notes
757///
758/// Implementors should:
759/// - Handle NAT traversal transparently
760/// - Maintain a peer table with capabilities
761/// - Emit events for connection state changes
762/// - Support protocol multiplexing
763///
764/// # Example Implementation
765///
766/// The default implementation wraps `P2pEndpoint`:
767///
768/// ```rust,ignore
769/// let config = P2pConfig::builder()
770/// .bind_addr("0.0.0.0:0".parse()?)
771/// .build()?;
772/// let endpoint = P2pEndpoint::new(config).await?;
773/// let transport: Arc<dyn LinkTransport<Conn = P2pLinkConn>> = Arc::new(endpoint);
774/// ```
775pub trait LinkTransport: Send + Sync + 'static {
776 /// The connection type returned by this transport.
777 type Conn: LinkConn + 'static;
778
779 /// Get our local peer identity.
780 ///
781 /// This is our stable cryptographic identity, derived from our key pair.
782 /// It remains constant across restarts and network changes.
783 fn local_peer(&self) -> PeerId;
784
785 /// Get our externally observed address, if known.
786 ///
787 /// Returns the address other peers see when we connect to them.
788 /// This is discovered via:
789 /// - OBSERVED_ADDRESS frames from connected peers
790 /// - NAT traversal address discovery
791 ///
792 /// Returns `None` if we haven't connected to any peers yet or
793 /// if we're behind a symmetric NAT that changes our external port.
794 fn external_address(&self) -> Option<SocketAddr>;
795
796 /// Get all known peers with their capabilities.
797 ///
798 /// Includes:
799 /// - Currently connected peers (`caps.is_connected = true`)
800 /// - Previously connected peers still in bootstrap cache
801 /// - Peers learned from relay/coordination traffic
802 ///
803 /// Use `Capabilities::quality_score()` to rank peers for selection.
804 fn peer_table(&self) -> Vec<(PeerId, Capabilities)>;
805
806 /// Get capabilities for a specific peer.
807 ///
808 /// Returns `None` if the peer is not known.
809 fn peer_capabilities(&self, peer: &PeerId) -> Option<Capabilities>;
810
811 /// Subscribe to transport-level events.
812 ///
813 /// Events include peer connections/disconnections, address changes,
814 /// and capability updates. Use for maintaining overlay state.
815 ///
816 /// Multiple subscribers are supported via broadcast channel.
817 fn subscribe(&self) -> broadcast::Receiver<LinkEvent>;
818
819 /// Accept incoming connections for a specific protocol.
820 ///
821 /// Returns a stream of connections from peers that want to speak
822 /// the specified protocol. Register your protocol first with
823 /// `register_protocol()`.
824 ///
825 /// # Example
826 /// ```rust,ignore
827 /// let mut incoming = transport.accept(MY_PROTOCOL);
828 /// while let Some(result) = incoming.next().await {
829 /// if let Ok(conn) = result {
830 /// tokio::spawn(handle_connection(conn));
831 /// }
832 /// }
833 /// ```
834 fn accept(&self, proto: ProtocolId) -> Incoming<Self::Conn>;
835
836 /// Dial a peer by their PeerId (preferred method).
837 ///
838 /// Uses the peer table to find known addresses for this peer.
839 /// NAT traversal is handled automatically - if direct connection
840 /// fails, coordination and hole-punching are attempted.
841 ///
842 /// # Errors
843 /// - `PeerNotFound`: Peer not in table (need to bootstrap)
844 /// - `ConnectionFailed`: Network error (may be transient)
845 /// - `Timeout`: NAT traversal timed out (retry may succeed)
846 fn dial(&self, peer: PeerId, proto: ProtocolId) -> BoxFuture<'_, LinkResult<Self::Conn>>;
847
848 /// Dial a peer by direct address (for bootstrapping).
849 ///
850 /// Use when you don't know the peer's ID yet, such as when
851 /// connecting to a known seed address to join the network.
852 ///
853 /// After connection, the peer's ID will be available via
854 /// `conn.peer()`.
855 fn dial_addr(
856 &self,
857 addr: SocketAddr,
858 proto: ProtocolId,
859 ) -> BoxFuture<'_, LinkResult<Self::Conn>>;
860
861 /// Get protocols we advertise as supported.
862 fn supported_protocols(&self) -> Vec<ProtocolId>;
863
864 /// Register a protocol as supported.
865 ///
866 /// Call this before `accept()` to receive connections for the protocol.
867 /// Registered protocols are advertised to connected peers.
868 fn register_protocol(&self, proto: ProtocolId);
869
870 /// Unregister a protocol.
871 ///
872 /// Stops accepting new connections for this protocol. Existing
873 /// connections are not affected.
874 fn unregister_protocol(&self, proto: ProtocolId);
875
876 /// Check if we have an active connection to a peer.
877 fn is_connected(&self, peer: &PeerId) -> bool;
878
879 /// Get the count of active connections.
880 fn active_connections(&self) -> usize;
881
882 /// Gracefully shutdown the transport.
883 ///
884 /// Closes all connections, stops accepting new ones, and flushes
885 /// the bootstrap cache to disk. Pending operations will complete
886 /// or error.
887 ///
888 /// Call this before exiting to ensure clean shutdown.
889 fn shutdown(&self) -> BoxFuture<'_, ()>;
890}
891
892// ============================================================================
893// P2pEndpoint Implementation
894// ============================================================================
895
896// The implementation of LinkTransport for P2pEndpoint is in a separate file
897// to keep this module focused on the trait definitions.
898
899#[cfg(test)]
900mod tests {
901 use super::*;
902
903 #[test]
904 fn test_protocol_id_from_string() {
905 let proto = ProtocolId::from("saorsa-dht/1.0");
906 assert_eq!(&proto.0[..14], b"saorsa-dht/1.0");
907 assert_eq!(proto.0[14], 0);
908 assert_eq!(proto.0[15], 0);
909 }
910
911 #[test]
912 fn test_protocol_id_truncation() {
913 let proto = ProtocolId::from("this-is-a-very-long-protocol-name");
914 assert_eq!(&proto.0, b"this-is-a-very-l");
915 }
916
917 #[test]
918 fn test_protocol_id_display() {
919 let proto = ProtocolId::from("test/1.0");
920 assert_eq!(format!("{}", proto), "test/1.0");
921 }
922
923 #[test]
924 fn test_capabilities_quality_score() {
925 let mut caps = Capabilities::default();
926
927 // Default has perfect RTT (0ms) and no packet loss, so score should be high
928 // Score = 0.5 (base) + 0.3 (RTT: 1.0*0.3) + 0.2 (loss: 1.0*0.2) = 1.0
929 let base_score = caps.quality_score();
930 assert!(
931 (0.9..=1.0).contains(&base_score),
932 "base_score = {}",
933 base_score
934 );
935
936 // Worse RTT should reduce score
937 caps.rtt_ms_p50 = 150; // 50% of max
938 let worse_rtt_score = caps.quality_score();
939 assert!(
940 worse_rtt_score < base_score,
941 "worse RTT should reduce score"
942 );
943
944 // Very bad RTT should reduce score more
945 caps.rtt_ms_p50 = 500;
946 let bad_rtt_score = caps.quality_score();
947 assert!(
948 bad_rtt_score < worse_rtt_score,
949 "bad RTT should reduce score more"
950 );
951
952 // Symmetric NAT should reduce score
953 caps.rtt_ms_p50 = 50;
954 caps.nat_type_hint = Some(NatHint::Symmetric);
955 let nat_score = caps.quality_score();
956 // Reset RTT for fair comparison
957 caps.nat_type_hint = None;
958 caps.rtt_ms_p50 = 50;
959 let no_nat_score = caps.quality_score();
960 assert!(
961 nat_score < no_nat_score,
962 "symmetric NAT should reduce score"
963 );
964 }
965
966 #[test]
967 fn test_capabilities_supports_protocol() {
968 let mut caps = Capabilities::default();
969 let dht = ProtocolId::from("dht/1.0");
970 let gossip = ProtocolId::from("gossip/1.0");
971
972 caps.protocols.push(dht);
973
974 assert!(caps.supports_protocol(&dht));
975 assert!(!caps.supports_protocol(&gossip));
976 }
977}