pub mod tcp;
pub mod tor;
pub mod udp;
#[cfg(feature = "sim-transport")]
pub mod sim;
#[cfg(any(target_os = "linux", target_os = "macos"))]
pub mod ethernet;
#[cfg(target_os = "linux")]
pub mod ble;
#[cfg(target_os = "linux")]
use ble::DefaultBleTransport;
#[cfg(any(target_os = "linux", target_os = "macos"))]
use ethernet::EthernetTransport;
use secp256k1::XOnlyPublicKey;
#[cfg(feature = "sim-transport")]
use sim::SimTransport;
use std::fmt;
use std::net::SocketAddr;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tcp::TcpTransport;
use thiserror::Error;
use tor::TorTransport;
use tor::control::TorMonitoringInfo;
use udp::UdpTransport;
#[derive(Clone, Debug)]
pub struct ReceivedPacket {
pub transport_id: TransportId,
pub remote_addr: TransportAddr,
pub data: Vec<u8>,
pub timestamp_ms: u64,
#[doc(hidden)]
pub trace_enqueued_at: Option<Instant>,
}
impl ReceivedPacket {
pub fn new(transport_id: TransportId, remote_addr: TransportAddr, data: Vec<u8>) -> Self {
let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
Self {
transport_id,
remote_addr,
data,
timestamp_ms,
trace_enqueued_at: crate::perf_profile::stamp(),
}
}
pub fn with_timestamp(
transport_id: TransportId,
remote_addr: TransportAddr,
data: Vec<u8>,
timestamp_ms: u64,
) -> Self {
Self {
transport_id,
remote_addr,
data,
timestamp_ms,
trace_enqueued_at: crate::perf_profile::stamp(),
}
}
}
pub type PacketTx = tokio::sync::mpsc::UnboundedSender<ReceivedPacket>;
pub type PacketRx = tokio::sync::mpsc::UnboundedReceiver<ReceivedPacket>;
pub fn packet_channel(_buffer: usize) -> (PacketTx, PacketRx) {
tokio::sync::mpsc::unbounded_channel()
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct TransportId(u32);
impl TransportId {
pub fn new(id: u32) -> Self {
Self(id)
}
pub fn as_u32(&self) -> u32 {
self.0
}
}
impl fmt::Display for TransportId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "transport:{}", self.0)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct LinkId(u64);
impl LinkId {
pub fn new(id: u64) -> Self {
Self(id)
}
pub fn as_u64(&self) -> u64 {
self.0
}
}
impl fmt::Display for LinkId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "link:{}", self.0)
}
}
#[derive(Debug, Error)]
pub enum TransportError {
#[error("transport not started")]
NotStarted,
#[error("transport already started")]
AlreadyStarted,
#[error("transport failed to start: {0}")]
StartFailed(String),
#[error("transport shutdown failed: {0}")]
ShutdownFailed(String),
#[error("link failed: {0}")]
LinkFailed(String),
#[error("send failed: {0}")]
SendFailed(String),
#[error("receive failed: {0}")]
RecvFailed(String),
#[error("invalid transport address: {0}")]
InvalidAddress(String),
#[error("mtu exceeded: packet {packet_size} > mtu {mtu}")]
MtuExceeded { packet_size: usize, mtu: u16 },
#[error("transport timeout")]
Timeout,
#[error("connection refused")]
ConnectionRefused,
#[error("transport not supported: {0}")]
NotSupported(String),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TransportType {
pub name: &'static str,
pub connection_oriented: bool,
pub reliable: bool,
}
impl TransportType {
pub const UDP: TransportType = TransportType {
name: "udp",
connection_oriented: false,
reliable: false,
};
pub const TCP: TransportType = TransportType {
name: "tcp",
connection_oriented: true,
reliable: true,
};
pub const ETHERNET: TransportType = TransportType {
name: "ethernet",
connection_oriented: false,
reliable: false,
};
pub const WIFI: TransportType = TransportType {
name: "wifi",
connection_oriented: false,
reliable: false,
};
pub const TOR: TransportType = TransportType {
name: "tor",
connection_oriented: true,
reliable: true,
};
pub const SERIAL: TransportType = TransportType {
name: "serial",
connection_oriented: false,
reliable: true, };
pub const BLE: TransportType = TransportType {
name: "ble",
connection_oriented: true,
reliable: true, };
#[cfg(feature = "sim-transport")]
pub const SIM: TransportType = TransportType {
name: "sim",
connection_oriented: false,
reliable: false,
};
pub fn is_connectionless(&self) -> bool {
!self.connection_oriented
}
}
impl fmt::Display for TransportType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum TransportState {
Configured,
Starting,
Up,
Down,
Failed,
}
impl TransportState {
pub fn is_operational(&self) -> bool {
matches!(self, TransportState::Up)
}
pub fn can_start(&self) -> bool {
matches!(
self,
TransportState::Configured | TransportState::Down | TransportState::Failed
)
}
pub fn is_terminal(&self) -> bool {
matches!(self, TransportState::Failed)
}
}
impl fmt::Display for TransportState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
TransportState::Configured => "configured",
TransportState::Starting => "starting",
TransportState::Up => "up",
TransportState::Down => "down",
TransportState::Failed => "failed",
};
write!(f, "{}", s)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LinkState {
Connecting,
Connected,
Disconnected,
Failed,
}
impl LinkState {
pub fn is_operational(&self) -> bool {
matches!(self, LinkState::Connected)
}
pub fn is_terminal(&self) -> bool {
matches!(self, LinkState::Disconnected | LinkState::Failed)
}
}
impl fmt::Display for LinkState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
LinkState::Connecting => "connecting",
LinkState::Connected => "connected",
LinkState::Disconnected => "disconnected",
LinkState::Failed => "failed",
};
write!(f, "{}", s)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LinkDirection {
Outbound,
Inbound,
}
impl fmt::Display for LinkDirection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
LinkDirection::Outbound => "outbound",
LinkDirection::Inbound => "inbound",
};
write!(f, "{}", s)
}
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct TransportAddr(Vec<u8>);
impl TransportAddr {
pub fn new(bytes: Vec<u8>) -> Self {
Self(bytes)
}
pub fn from_bytes(bytes: &[u8]) -> Self {
Self(bytes.to_vec())
}
pub fn from_string(s: &str) -> Self {
Self(s.as_bytes().to_vec())
}
pub fn from_socket_addr(addr: std::net::SocketAddr) -> Self {
use std::io::Write;
let mut buf = Vec::with_capacity(56);
write!(&mut buf, "{addr}").expect("Vec<u8>::write_fmt is infallible");
Self(buf)
}
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
pub fn as_str(&self) -> Option<&str> {
std::str::from_utf8(&self.0).ok()
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
impl fmt::Debug for TransportAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.as_str() {
Some(s) => write!(f, "TransportAddr(\"{}\")", s),
None => write!(f, "TransportAddr({:?})", self.0),
}
}
}
impl fmt::Display for TransportAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.as_str() {
Some(s) => write!(f, "{}", s),
None => {
for byte in &self.0 {
write!(f, "{:02x}", byte)?;
}
Ok(())
}
}
}
}
impl From<&str> for TransportAddr {
fn from(s: &str) -> Self {
Self::from_string(s)
}
}
impl From<String> for TransportAddr {
fn from(s: String) -> Self {
Self(s.into_bytes())
}
}
#[derive(Clone, Debug, Default)]
pub struct LinkStats {
pub packets_sent: u64,
pub packets_recv: u64,
pub bytes_sent: u64,
pub bytes_recv: u64,
pub last_recv_ms: u64,
rtt_estimate: Option<Duration>,
pub loss_rate: f32,
pub throughput_estimate: u64,
}
impl LinkStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_sent(&mut self, bytes: usize) {
self.packets_sent += 1;
self.bytes_sent += bytes as u64;
}
pub fn record_recv(&mut self, bytes: usize, timestamp_ms: u64) {
self.packets_recv += 1;
self.bytes_recv += bytes as u64;
self.last_recv_ms = timestamp_ms;
}
pub fn rtt_estimate(&self) -> Option<Duration> {
self.rtt_estimate
}
pub fn update_rtt(&mut self, rtt: Duration) {
match self.rtt_estimate {
Some(old_rtt) => {
let alpha = 0.2;
let new_rtt_nanos = (alpha * rtt.as_nanos() as f64
+ (1.0 - alpha) * old_rtt.as_nanos() as f64)
as u64;
self.rtt_estimate = Some(Duration::from_nanos(new_rtt_nanos));
}
None => {
self.rtt_estimate = Some(rtt);
}
}
}
pub fn time_since_recv(&self, current_time_ms: u64) -> u64 {
if self.last_recv_ms == 0 {
return u64::MAX;
}
current_time_ms.saturating_sub(self.last_recv_ms)
}
pub fn reset(&mut self) {
*self = Self::default();
}
}
#[derive(Clone, Debug)]
pub struct Link {
link_id: LinkId,
transport_id: TransportId,
remote_addr: TransportAddr,
direction: LinkDirection,
state: LinkState,
base_rtt: Duration,
stats: LinkStats,
created_at: u64,
}
impl Link {
pub fn new(
link_id: LinkId,
transport_id: TransportId,
remote_addr: TransportAddr,
direction: LinkDirection,
base_rtt: Duration,
) -> Self {
Self {
link_id,
transport_id,
remote_addr,
direction,
state: LinkState::Connecting,
base_rtt,
stats: LinkStats::new(),
created_at: 0,
}
}
pub fn new_with_timestamp(
link_id: LinkId,
transport_id: TransportId,
remote_addr: TransportAddr,
direction: LinkDirection,
base_rtt: Duration,
created_at: u64,
) -> Self {
let mut link = Self::new(link_id, transport_id, remote_addr, direction, base_rtt);
link.created_at = created_at;
link
}
pub fn connectionless(
link_id: LinkId,
transport_id: TransportId,
remote_addr: TransportAddr,
direction: LinkDirection,
base_rtt: Duration,
) -> Self {
let mut link = Self::new(link_id, transport_id, remote_addr, direction, base_rtt);
link.state = LinkState::Connected;
link
}
pub fn link_id(&self) -> LinkId {
self.link_id
}
pub fn transport_id(&self) -> TransportId {
self.transport_id
}
pub fn remote_addr(&self) -> &TransportAddr {
&self.remote_addr
}
pub fn direction(&self) -> LinkDirection {
self.direction
}
pub fn state(&self) -> LinkState {
self.state
}
pub fn base_rtt(&self) -> Duration {
self.base_rtt
}
pub fn stats(&self) -> &LinkStats {
&self.stats
}
pub fn stats_mut(&mut self) -> &mut LinkStats {
&mut self.stats
}
pub fn created_at(&self) -> u64 {
self.created_at
}
pub fn set_created_at(&mut self, timestamp: u64) {
self.created_at = timestamp;
}
pub fn set_connected(&mut self) {
self.state = LinkState::Connected;
}
pub fn set_disconnected(&mut self) {
self.state = LinkState::Disconnected;
}
pub fn set_failed(&mut self) {
self.state = LinkState::Failed;
}
pub fn is_operational(&self) -> bool {
self.state.is_operational()
}
pub fn is_terminal(&self) -> bool {
self.state.is_terminal()
}
pub fn effective_rtt(&self) -> Duration {
self.stats.rtt_estimate().unwrap_or(self.base_rtt)
}
pub fn age(&self, current_time_ms: u64) -> u64 {
if self.created_at == 0 {
return 0;
}
current_time_ms.saturating_sub(self.created_at)
}
}
#[derive(Clone, Debug)]
pub struct DiscoveredPeer {
pub transport_id: TransportId,
pub addr: TransportAddr,
pub pubkey_hint: Option<XOnlyPublicKey>,
}
impl DiscoveredPeer {
pub fn new(transport_id: TransportId, addr: TransportAddr) -> Self {
Self {
transport_id,
addr,
pubkey_hint: None,
}
}
pub fn with_hint(
transport_id: TransportId,
addr: TransportAddr,
pubkey: XOnlyPublicKey,
) -> Self {
Self {
transport_id,
addr,
pubkey_hint: Some(pubkey),
}
}
}
pub trait Transport {
fn transport_id(&self) -> TransportId;
fn transport_type(&self) -> &TransportType;
fn state(&self) -> TransportState;
fn mtu(&self) -> u16;
fn link_mtu(&self, addr: &TransportAddr) -> u16 {
let _ = addr;
self.mtu()
}
fn start(&mut self) -> Result<(), TransportError>;
fn stop(&mut self) -> Result<(), TransportError>;
fn send(&self, addr: &TransportAddr, data: &[u8]) -> Result<(), TransportError>;
fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError>;
fn auto_connect(&self) -> bool {
false
}
fn accept_connections(&self) -> bool {
true
}
fn close_connection(&self, _addr: &TransportAddr) {
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ConnectionState {
None,
Connecting,
Connected,
Failed(String),
}
#[derive(Clone, Debug, Default)]
pub struct TransportCongestion {
pub recv_drops: Option<u64>,
}
pub enum TransportHandle {
Udp(UdpTransport),
#[cfg(feature = "sim-transport")]
Sim(SimTransport),
#[cfg(any(target_os = "linux", target_os = "macos"))]
Ethernet(EthernetTransport),
Tcp(TcpTransport),
Tor(TorTransport),
#[cfg(target_os = "linux")]
Ble(DefaultBleTransport),
}
impl TransportHandle {
pub async fn start(&mut self) -> Result<(), TransportError> {
match self {
TransportHandle::Udp(t) => t.start_async().await,
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.start_async().await,
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.start_async().await,
TransportHandle::Tcp(t) => t.start_async().await,
TransportHandle::Tor(t) => t.start_async().await,
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.start_async().await,
}
}
pub async fn stop(&mut self) -> Result<(), TransportError> {
match self {
TransportHandle::Udp(t) => t.stop_async().await,
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.stop_async().await,
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.stop_async().await,
TransportHandle::Tcp(t) => t.stop_async().await,
TransportHandle::Tor(t) => t.stop_async().await,
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.stop_async().await,
}
}
pub async fn send(&self, addr: &TransportAddr, data: &[u8]) -> Result<usize, TransportError> {
match self {
TransportHandle::Udp(t) => t.send_async(addr, data).await,
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.send_async(addr, data).await,
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.send_async(addr, data).await,
TransportHandle::Tcp(t) => t.send_async(addr, data).await,
TransportHandle::Tor(t) => t.send_async(addr, data).await,
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.send_async(addr, data).await,
}
}
pub async fn flush_pending_send(&self) {
if let TransportHandle::Udp(t) = self {
t.flush_pending_send().await;
}
}
pub fn transport_id(&self) -> TransportId {
match self {
TransportHandle::Udp(t) => t.transport_id(),
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.transport_id(),
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.transport_id(),
TransportHandle::Tcp(t) => t.transport_id(),
TransportHandle::Tor(t) => t.transport_id(),
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.transport_id(),
}
}
pub fn name(&self) -> Option<&str> {
match self {
TransportHandle::Udp(t) => t.name(),
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.name(),
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.name(),
TransportHandle::Tcp(t) => t.name(),
TransportHandle::Tor(t) => t.name(),
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.name(),
}
}
pub fn transport_type(&self) -> &TransportType {
match self {
TransportHandle::Udp(t) => t.transport_type(),
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.transport_type(),
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.transport_type(),
TransportHandle::Tcp(t) => t.transport_type(),
TransportHandle::Tor(t) => t.transport_type(),
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.transport_type(),
}
}
pub fn state(&self) -> TransportState {
match self {
TransportHandle::Udp(t) => t.state(),
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.state(),
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.state(),
TransportHandle::Tcp(t) => t.state(),
TransportHandle::Tor(t) => t.state(),
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.state(),
}
}
pub fn mtu(&self) -> u16 {
match self {
TransportHandle::Udp(t) => t.mtu(),
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.mtu(),
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.mtu(),
TransportHandle::Tcp(t) => t.mtu(),
TransportHandle::Tor(t) => t.mtu(),
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.mtu(),
}
}
pub fn link_mtu(&self, addr: &TransportAddr) -> u16 {
match self {
TransportHandle::Udp(t) => t.link_mtu(addr),
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.link_mtu(addr),
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.link_mtu(addr),
TransportHandle::Tcp(t) => t.link_mtu(addr),
TransportHandle::Tor(t) => t.link_mtu(addr),
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.link_mtu(addr),
}
}
pub fn local_addr(&self) -> Option<std::net::SocketAddr> {
match self {
TransportHandle::Udp(t) => t.local_addr(),
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(_) => None,
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(_) => None,
TransportHandle::Tcp(t) => t.local_addr(),
TransportHandle::Tor(_) => None,
#[cfg(target_os = "linux")]
TransportHandle::Ble(_) => None,
}
}
pub fn interface_name(&self) -> Option<&str> {
match self {
TransportHandle::Udp(_) => None,
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(_) => None,
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => Some(t.interface_name()),
TransportHandle::Tcp(_) => None,
TransportHandle::Tor(_) => None,
#[cfg(target_os = "linux")]
TransportHandle::Ble(_) => None,
}
}
pub fn onion_address(&self) -> Option<&str> {
match self {
TransportHandle::Tor(t) => t.onion_address(),
_ => None,
}
}
pub fn tor_monitoring(&self) -> Option<TorMonitoringInfo> {
match self {
TransportHandle::Tor(t) => t.cached_monitoring(),
_ => None,
}
}
pub fn tor_mode(&self) -> Option<&str> {
match self {
TransportHandle::Tor(t) => Some(t.mode()),
_ => None,
}
}
pub fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
match self {
TransportHandle::Udp(t) => t.discover(),
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.discover(),
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.discover(),
TransportHandle::Tcp(t) => t.discover(),
TransportHandle::Tor(t) => t.discover(),
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.discover(),
}
}
pub fn auto_connect(&self) -> bool {
match self {
TransportHandle::Udp(t) => t.auto_connect(),
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.auto_connect(),
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.auto_connect(),
TransportHandle::Tcp(t) => t.auto_connect(),
TransportHandle::Tor(t) => t.auto_connect(),
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.auto_connect(),
}
}
pub fn accept_connections(&self) -> bool {
match self {
TransportHandle::Udp(t) => t.accept_connections(),
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.accept_connections(),
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.accept_connections(),
TransportHandle::Tcp(t) => t.accept_connections(),
TransportHandle::Tor(t) => t.accept_connections(),
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.accept_connections(),
}
}
pub async fn connect(&self, addr: &TransportAddr) -> Result<(), TransportError> {
match self {
TransportHandle::Udp(_) => Ok(()), #[cfg(feature = "sim-transport")]
TransportHandle::Sim(_) => Ok(()), #[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(_) => Ok(()), TransportHandle::Tcp(t) => t.connect_async(addr).await,
TransportHandle::Tor(t) => t.connect_async(addr).await,
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.connect_async(addr).await,
}
}
pub fn connection_state(&self, addr: &TransportAddr) -> ConnectionState {
match self {
TransportHandle::Udp(_) => ConnectionState::Connected,
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(_) => ConnectionState::Connected,
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(_) => ConnectionState::Connected,
TransportHandle::Tcp(t) => t.connection_state_sync(addr),
TransportHandle::Tor(t) => t.connection_state_sync(addr),
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.connection_state_sync(addr),
}
}
pub async fn close_connection(&self, addr: &TransportAddr) {
match self {
TransportHandle::Udp(t) => t.close_connection(addr),
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => t.close_connection(addr),
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => t.close_connection(addr),
TransportHandle::Tcp(t) => t.close_connection_async(addr).await,
TransportHandle::Tor(t) => t.close_connection_async(addr).await,
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => t.close_connection_async(addr).await,
}
}
pub fn is_operational(&self) -> bool {
self.state().is_operational()
}
pub fn congestion(&self) -> TransportCongestion {
match self {
TransportHandle::Udp(t) => t.congestion(),
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(_) => TransportCongestion::default(),
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(_) => TransportCongestion::default(),
TransportHandle::Tcp(_) => TransportCongestion::default(),
TransportHandle::Tor(_) => TransportCongestion::default(),
#[cfg(target_os = "linux")]
TransportHandle::Ble(_) => TransportCongestion::default(),
}
}
pub fn transport_stats(&self) -> serde_json::Value {
match self {
TransportHandle::Udp(t) => {
serde_json::to_value(t.stats().snapshot()).unwrap_or_default()
}
#[cfg(feature = "sim-transport")]
TransportHandle::Sim(t) => serde_json::to_value(t.stats()).unwrap_or_default(),
#[cfg(any(target_os = "linux", target_os = "macos"))]
TransportHandle::Ethernet(t) => {
let snap = t.stats().snapshot();
serde_json::json!({
"frames_sent": snap.frames_sent,
"frames_recv": snap.frames_recv,
"bytes_sent": snap.bytes_sent,
"bytes_recv": snap.bytes_recv,
"send_errors": snap.send_errors,
"recv_errors": snap.recv_errors,
"beacons_sent": snap.beacons_sent,
"beacons_recv": snap.beacons_recv,
"frames_too_short": snap.frames_too_short,
"frames_too_long": snap.frames_too_long,
})
}
TransportHandle::Tcp(t) => {
serde_json::to_value(t.stats().snapshot()).unwrap_or_default()
}
TransportHandle::Tor(t) => {
serde_json::to_value(t.stats().snapshot()).unwrap_or_default()
}
#[cfg(target_os = "linux")]
TransportHandle::Ble(t) => {
serde_json::to_value(t.stats().snapshot()).unwrap_or_default()
}
}
}
}
pub(crate) async fn resolve_socket_addr(
addr: &TransportAddr,
) -> Result<SocketAddr, TransportError> {
let s = addr
.as_str()
.ok_or_else(|| TransportError::InvalidAddress("not valid UTF-8".into()))?;
if let Ok(sock_addr) = s.parse::<SocketAddr>() {
return Ok(sock_addr);
}
tokio::net::lookup_host(s)
.await
.map_err(|e| {
TransportError::InvalidAddress(format!("DNS resolution failed for {}: {}", s, e))
})?
.next()
.ok_or_else(|| {
TransportError::InvalidAddress(format!(
"DNS resolution returned no addresses for {}",
s
))
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_transport_id() {
let id = TransportId::new(42);
assert_eq!(id.as_u32(), 42);
assert_eq!(format!("{}", id), "transport:42");
}
#[test]
fn test_link_id() {
let id = LinkId::new(12345);
assert_eq!(id.as_u64(), 12345);
assert_eq!(format!("{}", id), "link:12345");
}
#[test]
fn test_transport_state_transitions() {
assert!(TransportState::Configured.can_start());
assert!(TransportState::Down.can_start());
assert!(TransportState::Failed.can_start());
assert!(!TransportState::Starting.can_start());
assert!(!TransportState::Up.can_start());
assert!(TransportState::Up.is_operational());
assert!(!TransportState::Starting.is_operational());
assert!(!TransportState::Failed.is_operational());
}
#[test]
fn test_link_state() {
assert!(LinkState::Connected.is_operational());
assert!(!LinkState::Connecting.is_operational());
assert!(!LinkState::Disconnected.is_operational());
assert!(!LinkState::Failed.is_operational());
assert!(LinkState::Disconnected.is_terminal());
assert!(LinkState::Failed.is_terminal());
assert!(!LinkState::Connected.is_terminal());
}
#[test]
#[allow(clippy::assertions_on_constants)]
fn test_transport_type_constants() {
assert!(!TransportType::UDP.connection_oriented);
assert!(!TransportType::UDP.reliable);
assert!(TransportType::UDP.is_connectionless());
assert!(TransportType::TOR.connection_oriented);
assert!(TransportType::TOR.reliable);
assert!(!TransportType::TOR.is_connectionless());
assert_eq!(TransportType::UDP.name, "udp");
assert_eq!(TransportType::ETHERNET.name, "ethernet");
}
#[test]
fn test_transport_addr_string() {
let addr = TransportAddr::from_string("192.168.1.1:2121");
assert_eq!(format!("{}", addr), "192.168.1.1:2121");
assert_eq!(addr.as_str(), Some("192.168.1.1:2121"));
}
#[test]
fn test_transport_addr_binary() {
let binary = TransportAddr::new(vec![0xff, 0x80, 0x2b, 0x3c, 0x4d, 0x5e]);
assert_eq!(format!("{}", binary), "ff802b3c4d5e");
assert!(binary.as_str().is_none());
assert_eq!(binary.len(), 6);
}
#[test]
fn test_transport_addr_from_string() {
let addr: TransportAddr = "test:1234".into();
assert_eq!(addr.as_str(), Some("test:1234"));
let addr2: TransportAddr = String::from("hello").into();
assert_eq!(addr2.as_str(), Some("hello"));
}
#[test]
fn test_link_stats_basic() {
let mut stats = LinkStats::new();
stats.record_sent(100);
stats.record_recv(200, 1000);
assert_eq!(stats.packets_sent, 1);
assert_eq!(stats.bytes_sent, 100);
assert_eq!(stats.packets_recv, 1);
assert_eq!(stats.bytes_recv, 200);
assert_eq!(stats.last_recv_ms, 1000);
}
#[test]
fn test_link_stats_rtt() {
let mut stats = LinkStats::new();
assert!(stats.rtt_estimate().is_none());
stats.update_rtt(Duration::from_millis(100));
assert_eq!(stats.rtt_estimate(), Some(Duration::from_millis(100)));
stats.update_rtt(Duration::from_millis(200));
let rtt = stats.rtt_estimate().unwrap();
assert!(rtt.as_millis() >= 110 && rtt.as_millis() <= 130);
}
#[test]
fn test_link_stats_time_since_recv() {
let mut stats = LinkStats::new();
assert_eq!(stats.time_since_recv(1000), u64::MAX);
stats.record_recv(100, 500);
assert_eq!(stats.time_since_recv(1000), 500);
assert_eq!(stats.time_since_recv(500), 0);
}
#[test]
fn test_link_creation() {
let link = Link::new(
LinkId::new(1),
TransportId::new(1),
TransportAddr::from_string("test"),
LinkDirection::Outbound,
Duration::from_millis(50),
);
assert_eq!(link.state(), LinkState::Connecting);
assert!(!link.is_operational());
assert_eq!(link.direction(), LinkDirection::Outbound);
}
#[test]
fn test_link_connectionless() {
let link = Link::connectionless(
LinkId::new(1),
TransportId::new(1),
TransportAddr::from_string("test"),
LinkDirection::Inbound,
Duration::from_millis(5),
);
assert_eq!(link.state(), LinkState::Connected);
assert!(link.is_operational());
}
#[test]
fn test_link_state_changes() {
let mut link = Link::new(
LinkId::new(1),
TransportId::new(1),
TransportAddr::from_string("test"),
LinkDirection::Outbound,
Duration::from_millis(50),
);
assert!(!link.is_operational());
link.set_connected();
assert!(link.is_operational());
assert!(!link.is_terminal());
link.set_disconnected();
assert!(!link.is_operational());
assert!(link.is_terminal());
}
#[test]
fn test_link_effective_rtt() {
let mut link = Link::connectionless(
LinkId::new(1),
TransportId::new(1),
TransportAddr::from_string("test"),
LinkDirection::Inbound,
Duration::from_millis(50),
);
assert_eq!(link.effective_rtt(), Duration::from_millis(50));
link.stats_mut().update_rtt(Duration::from_millis(100));
assert_eq!(link.effective_rtt(), Duration::from_millis(100));
}
#[test]
fn test_link_age() {
let mut link = Link::new(
LinkId::new(1),
TransportId::new(1),
TransportAddr::from_string("test"),
LinkDirection::Outbound,
Duration::from_millis(50),
);
assert_eq!(link.age(1000), 0);
link.set_created_at(500);
assert_eq!(link.age(1000), 500);
assert_eq!(link.age(500), 0);
}
#[test]
fn test_discovered_peer() {
let peer = DiscoveredPeer::new(
TransportId::new(1),
TransportAddr::from_string("192.168.1.1:2121"),
);
assert_eq!(peer.transport_id, TransportId::new(1));
assert!(peer.pubkey_hint.is_none());
}
#[test]
fn test_link_direction_display() {
assert_eq!(format!("{}", LinkDirection::Outbound), "outbound");
assert_eq!(format!("{}", LinkDirection::Inbound), "inbound");
}
#[test]
fn test_transport_state_display() {
assert_eq!(format!("{}", TransportState::Up), "up");
assert_eq!(format!("{}", TransportState::Failed), "failed");
}
#[test]
fn test_received_packet() {
let packet = ReceivedPacket::new(
TransportId::new(1),
TransportAddr::from_string("192.168.1.1:2121"),
vec![1, 2, 3, 4],
);
assert_eq!(packet.transport_id, TransportId::new(1));
assert_eq!(packet.data, vec![1, 2, 3, 4]);
assert!(packet.timestamp_ms > 0);
}
#[test]
fn test_received_packet_with_timestamp() {
let packet = ReceivedPacket::with_timestamp(
TransportId::new(1),
TransportAddr::from_string("test"),
vec![5, 6],
12345,
);
assert_eq!(packet.timestamp_ms, 12345);
}
#[tokio::test]
async fn test_packet_channel() {
let (tx, mut rx) = packet_channel(10);
let packet = ReceivedPacket::new(
TransportId::new(1),
TransportAddr::from_string("test"),
vec![1, 2, 3],
);
tx.send(packet.clone()).unwrap();
let received = rx.recv().await.unwrap();
assert_eq!(received.data, vec![1, 2, 3]);
}
struct MockTransport {
id: TransportId,
mtu_value: u16,
}
impl MockTransport {
fn new(mtu: u16) -> Self {
Self {
id: TransportId::new(99),
mtu_value: mtu,
}
}
}
impl Transport for MockTransport {
fn transport_id(&self) -> TransportId {
self.id
}
fn transport_type(&self) -> &TransportType {
&TransportType::UDP
}
fn state(&self) -> TransportState {
TransportState::Up
}
fn mtu(&self) -> u16 {
self.mtu_value
}
fn start(&mut self) -> Result<(), TransportError> {
Ok(())
}
fn stop(&mut self) -> Result<(), TransportError> {
Ok(())
}
fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
Ok(())
}
fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
Ok(vec![])
}
}
struct PerLinkMtuTransport {
id: TransportId,
default_mtu: u16,
overrides: Vec<(TransportAddr, u16)>,
}
impl PerLinkMtuTransport {
fn new(default_mtu: u16, overrides: Vec<(TransportAddr, u16)>) -> Self {
Self {
id: TransportId::new(100),
default_mtu,
overrides,
}
}
}
impl Transport for PerLinkMtuTransport {
fn transport_id(&self) -> TransportId {
self.id
}
fn transport_type(&self) -> &TransportType {
&TransportType::UDP
}
fn state(&self) -> TransportState {
TransportState::Up
}
fn mtu(&self) -> u16 {
self.default_mtu
}
fn link_mtu(&self, addr: &TransportAddr) -> u16 {
for (a, mtu) in &self.overrides {
if a == addr {
return *mtu;
}
}
self.mtu()
}
fn start(&mut self) -> Result<(), TransportError> {
Ok(())
}
fn stop(&mut self) -> Result<(), TransportError> {
Ok(())
}
fn send(&self, _addr: &TransportAddr, _data: &[u8]) -> Result<(), TransportError> {
Ok(())
}
fn discover(&self) -> Result<Vec<DiscoveredPeer>, TransportError> {
Ok(vec![])
}
}
#[test]
fn test_link_mtu_default_falls_back_to_mtu() {
let transport = MockTransport::new(1280);
let addr = TransportAddr::from_string("192.168.1.1:2121");
assert_eq!(transport.link_mtu(&addr), 1280);
assert_eq!(transport.link_mtu(&addr), transport.mtu());
let other_addr = TransportAddr::from_string("10.0.0.1:5000");
assert_eq!(transport.link_mtu(&other_addr), 1280);
}
#[test]
fn test_link_mtu_per_link_override() {
let addr_a = TransportAddr::from_string("192.168.1.1:2121");
let addr_b = TransportAddr::from_string("10.0.0.1:5000");
let addr_unknown = TransportAddr::from_string("172.16.0.1:6000");
let transport =
PerLinkMtuTransport::new(1280, vec![(addr_a.clone(), 512), (addr_b.clone(), 247)]);
assert_eq!(transport.link_mtu(&addr_a), 512);
assert_eq!(transport.link_mtu(&addr_b), 247);
assert_eq!(transport.link_mtu(&addr_unknown), 1280);
assert_eq!(transport.mtu(), 1280);
}
#[test]
fn test_transport_handle_link_mtu_delegation() {
use crate::config::UdpConfig;
use crate::transport::udp::UdpTransport;
let config = UdpConfig::default();
let expected_mtu = config.mtu();
let (tx, _rx) = packet_channel(1);
let transport = UdpTransport::new(TransportId::new(1), None, config, tx);
let handle = TransportHandle::Udp(transport);
let addr = TransportAddr::from_string("192.168.1.1:2121");
assert_eq!(handle.link_mtu(&addr), expected_mtu);
assert_eq!(handle.link_mtu(&addr), handle.mtu());
}
}