use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::{borrow::Cow, io, net::SocketAddr};
use futures::Future;
use tokio::net::UdpSocket;
static VERSION_MISMATCH_DETECTED: AtomicBool = AtomicBool::new(false);
static VERSION_MISMATCH_GENERATION: AtomicU64 = AtomicU64::new(0);
pub fn signal_version_mismatch() {
VERSION_MISMATCH_DETECTED.store(true, Ordering::SeqCst);
VERSION_MISMATCH_GENERATION.fetch_add(1, Ordering::SeqCst);
}
pub fn version_mismatch_generation() -> u64 {
VERSION_MISMATCH_GENERATION.load(Ordering::SeqCst)
}
pub fn has_version_mismatch() -> bool {
VERSION_MISMATCH_DETECTED.load(Ordering::SeqCst)
}
pub fn clear_version_mismatch() {
VERSION_MISMATCH_DETECTED.store(false, Ordering::SeqCst);
}
static HIGHEST_SEEN_VERSION: Mutex<Option<HighestSeenVersion>> = Mutex::new(None);
static URGENT_UPDATE_NEEDED: AtomicBool = AtomicBool::new(false);
const MIN_VERSION_REPORTERS: usize = 2;
#[derive(Clone, Copy)]
struct HighestSeenVersion {
version: (u8, u8, u16),
reporter_count: usize,
}
pub fn report_peer_version(version: (u8, u8, u16)) {
let mut guard = HIGHEST_SEEN_VERSION
.lock()
.unwrap_or_else(|e| e.into_inner());
match *guard {
None => {
*guard = Some(HighestSeenVersion {
version,
reporter_count: 1,
});
}
Some(ref mut current) => {
if version > current.version {
*current = HighestSeenVersion {
version,
reporter_count: 1,
};
} else if version == current.version {
current.reporter_count = current.reporter_count.saturating_add(1);
}
}
}
}
pub fn get_highest_seen_version() -> Option<(u8, u8, u16)> {
let guard = HIGHEST_SEEN_VERSION
.lock()
.unwrap_or_else(|e| e.into_inner());
guard.and_then(|hsv| {
if hsv.reporter_count >= MIN_VERSION_REPORTERS {
Some(hsv.version)
} else {
None
}
})
}
#[cfg(test)]
fn reset_version_discovery() {
let mut guard = HIGHEST_SEEN_VERSION
.lock()
.unwrap_or_else(|e| e.into_inner());
*guard = None;
}
pub fn signal_urgent_update() {
URGENT_UPDATE_NEEDED.store(true, Ordering::SeqCst);
signal_version_mismatch();
}
pub fn is_urgent_update() -> bool {
URGENT_UPDATE_NEEDED.load(Ordering::SeqCst)
}
pub fn clear_urgent_update() {
URGENT_UPDATE_NEEDED.store(false, Ordering::SeqCst);
}
static OPEN_CONNECTION_COUNT: AtomicUsize = AtomicUsize::new(0);
pub fn set_open_connection_count(count: usize) {
OPEN_CONNECTION_COUNT.store(count, Ordering::SeqCst);
}
pub fn get_open_connection_count() -> usize {
OPEN_CONNECTION_COUNT.load(Ordering::SeqCst)
}
pub mod connection_handler;
mod crypto;
pub mod in_memory_socket;
#[cfg(any(test, feature = "bench"))]
pub use connection_handler::mock_transport;
pub mod fast_channel;
mod packet_data;
pub mod peer_connection;
mod received_packet_tracker;
pub(crate) mod bbr;
pub mod congestion_control;
pub(crate) mod fixed_rate;
pub mod global_bandwidth;
pub(crate) mod ledbat;
pub mod metrics;
mod sent_packet_tracker;
mod symmetric_message;
pub(crate) mod token_bucket;
pub use ledbat::LedbatStats;
pub use congestion_control::{
AlgorithmConfig, CongestionControl, CongestionControlAlgorithm, CongestionControlConfig,
CongestionControlStats, CongestionController,
};
pub use metrics::{TRANSPORT_METRICS, TransportMetrics, TransportSnapshot};
pub use packet_data::reset_nonce_counter;
pub use peer_connection::StreamId;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct TransferStats {
pub stream_id: u64,
pub remote_addr: SocketAddr,
pub bytes_transferred: u64,
pub elapsed: Duration,
pub peak_cwnd_bytes: u32,
pub final_cwnd_bytes: u32,
pub slowdowns_triggered: u32,
pub base_delay: Duration,
pub final_ssthresh_bytes: u32,
pub min_ssthresh_floor_bytes: u32,
pub total_timeouts: u32,
pub final_flightsize: u32,
pub configured_rate: u32,
}
impl TransferStats {
pub fn avg_throughput_bps(&self) -> u64 {
if self.elapsed.is_zero() {
return 0;
}
(self.bytes_transferred as f64 / self.elapsed.as_secs_f64()) as u64
}
}
type MessagePayload = bytes::Bytes;
type PacketId = u32;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ObservedAddr(SocketAddr);
impl ObservedAddr {
pub fn socket_addr(&self) -> SocketAddr {
self.0
}
}
impl std::fmt::Display for ObservedAddr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<SocketAddr> for ObservedAddr {
fn from(addr: SocketAddr) -> Self {
Self(addr)
}
}
pub(crate) use self::connection_handler::ExpectedInboundTracker;
pub use self::connection_handler::create_connection_handler;
pub use self::crypto::{TransportKeypair, TransportPublicKey};
pub use self::{
connection_handler::{InboundConnectionHandler, OutboundConnectionHandler},
peer_connection::PeerConnection,
};
pub use self::peer_connection::{
streaming::{StreamError, StreamHandle, StreamRegistry, StreamingInboundStream},
streaming_buffer::{InsertError, LockFreeStreamBuffer},
};
#[derive(Debug, thiserror::Error)]
#[cfg_attr(feature = "bench", allow(dead_code))]
pub enum TransportError {
#[error("transport handler channel closed, socket likely closed")]
ChannelClosed,
#[error("connection to remote closed")]
ConnectionClosed(SocketAddr),
#[error("failed while establishing connection, reason: {cause}")]
ConnectionEstablishmentFailure { cause: Cow<'static, str> },
#[error(
"Version incompatibility with gateway\n Your client version: {actual}\n Gateway version: {expected}\n \n To fix this, update your Freenet client:\n cargo install --force freenet --version {expected}\n \n Or if building from source:\n git pull && cargo install --path crates/core"
)]
ProtocolVersionMismatch { expected: String, actual: String },
#[error("send to {0} failed: {1}")]
SendFailed(SocketAddr, std::io::ErrorKind),
#[error(transparent)]
IO(#[from] std::io::Error),
#[error(transparent)]
Other(#[from] anyhow::Error),
#[error(transparent)]
PubKeyDecryptionError(#[from] crypto::DecryptionError),
#[error(transparent)]
Serialization(#[from] bincode::Error),
}
impl TransportError {
pub fn is_transient_send_failure(&self) -> bool {
matches!(self, TransportError::SendFailed(..))
}
}
pub trait Socket: Sized + Send + Sync + 'static {
fn bind(addr: SocketAddr) -> impl Future<Output = io::Result<Self>> + Send;
fn recv_from(
&self,
buf: &mut [u8],
) -> impl Future<Output = io::Result<(usize, SocketAddr)>> + Send;
fn send_to(
&self,
buf: &[u8],
target: SocketAddr,
) -> impl Future<Output = io::Result<usize>> + Send;
#[allow(dead_code)]
fn send_to_blocking(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize>;
}
pub fn normalize_mapped_addr(addr: SocketAddr) -> SocketAddr {
if let SocketAddr::V6(v6) = &addr {
if let Some(v4) = v6.ip().to_ipv4_mapped() {
return SocketAddr::new(std::net::IpAddr::V4(v4), v6.port());
}
}
addr
}
fn map_addr_for_send(local_is_ipv6: bool, target: SocketAddr) -> SocketAddr {
if local_is_ipv6 {
if let SocketAddr::V4(v4) = &target {
let mapped = v4.ip().to_ipv6_mapped();
return SocketAddr::new(std::net::IpAddr::V6(mapped), v4.port());
}
}
target
}
impl Socket for UdpSocket {
async fn bind(addr: SocketAddr) -> io::Result<Self> {
let is_ipv6 = addr.is_ipv6();
let domain = if is_ipv6 {
socket2::Domain::IPV6
} else {
socket2::Domain::IPV4
};
let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))
.map_err(|e| io::Error::new(e.kind(), format!("Failed to create UDP socket: {e}")))?;
if is_ipv6 {
sock.set_only_v6(false)?;
}
sock.set_nonblocking(true)?;
sock.bind(&addr.into()).map_err(|e| {
io::Error::new(
e.kind(),
format!("Failed to bind UDP socket to {addr}: {e}"),
)
})?;
let std_socket: std::net::UdpSocket = sock.into();
Self::from_std(std_socket)
}
async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
let (len, addr) = self.recv_from(buf).await?;
Ok((len, normalize_mapped_addr(addr)))
}
async fn send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
let local_is_ipv6 = self.local_addr().map(|a| a.is_ipv6()).unwrap_or(false);
let target = map_addr_for_send(local_is_ipv6, target);
self.send_to(buf, target).await
}
fn send_to_blocking(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
let local_is_ipv6 = self.local_addr().map(|a| a.is_ipv6()).unwrap_or(false);
let target = map_addr_for_send(local_is_ipv6, target);
let mut backoff_us = 1u64; const MAX_BACKOFF_US: u64 = 1000;
loop {
match self.try_send_to(buf, target) {
Ok(n) => return Ok(n),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
std::thread::sleep(std::time::Duration::from_micros(backoff_us));
backoff_us = (backoff_us * 2).min(MAX_BACKOFF_US);
}
Err(e) => return Err(e),
}
}
}
}
use crate::message::NetMessage;
pub(crate) trait PeerConnectionApi: Send {
fn remote_addr(&self) -> SocketAddr;
fn send_message(
&mut self,
msg: NetMessage,
) -> std::pin::Pin<Box<dyn Future<Output = Result<(), TransportError>> + Send + '_>>;
fn recv(
&mut self,
) -> std::pin::Pin<Box<dyn Future<Output = Result<Vec<u8>, TransportError>> + Send + '_>>;
fn set_orphan_stream_registry(
&mut self,
registry: std::sync::Arc<crate::operations::orphan_streams::OrphanStreamRegistry>,
);
fn send_stream_data(
&mut self,
stream_id: crate::transport::peer_connection::StreamId,
data: bytes::Bytes,
metadata: Option<bytes::Bytes>,
completion_tx: Option<tokio::sync::oneshot::Sender<()>>,
) -> std::pin::Pin<Box<dyn Future<Output = Result<(), TransportError>> + Send + '_>>;
fn pipe_stream_data(
&mut self,
outbound_stream_id: crate::transport::peer_connection::StreamId,
inbound_handle: crate::transport::peer_connection::streaming::StreamHandle,
metadata: Option<bytes::Bytes>,
) -> std::pin::Pin<Box<dyn Future<Output = Result<(), TransportError>> + Send + '_>>;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport::received_packet_tracker::ReportResult;
use crate::transport::sent_packet_tracker::ResendAction;
#[test]
fn test_packet_send_receive_acknowledge_flow() {
let mut sent_tracker = sent_packet_tracker::tests::mock_sent_packet_tracker();
let mut received_tracker = received_packet_tracker::tests::mock_received_packet_tracker();
let effective_rto = sent_tracker.effective_rto();
for id in 1..=5 {
sent_tracker.report_sent_packet(id, vec![id as u8].into());
}
for id in [1u32, 3, 5] {
assert_eq!(
received_tracker.report_received_packet(id),
ReportResult::Ok
);
}
let receipts = received_tracker.get_receipts();
assert_eq!(receipts, vec![1u32, 3, 5]);
let _ = sent_tracker.report_received_receipts(&receipts);
sent_tracker.time_source.advance(effective_rto);
for id in [2, 4] {
match sent_tracker.get_resend() {
ResendAction::Resend(packet_id, packet) => {
assert_eq!(packet_id, id);
sent_tracker.report_sent_packet(id, packet);
}
ResendAction::WaitUntil(_) | ResendAction::TlpProbe(..) => {
panic!("Expected resend action for packet {id}")
}
}
}
}
}
#[cfg(test)]
mod version_discovery_tests {
use super::*;
#[test]
fn single_reporter_not_trusted() {
reset_version_discovery();
report_peer_version((0, 1, 153));
assert_eq!(get_highest_seen_version(), None);
}
#[test]
fn two_reporters_trusted() {
reset_version_discovery();
report_peer_version((0, 1, 153));
report_peer_version((0, 1, 153));
assert_eq!(get_highest_seen_version(), Some((0, 1, 153)));
}
#[test]
fn higher_version_resets_count() {
reset_version_discovery();
report_peer_version((0, 1, 153));
report_peer_version((0, 1, 153)); assert_eq!(get_highest_seen_version(), Some((0, 1, 153)));
report_peer_version((0, 1, 154));
assert_eq!(get_highest_seen_version(), None);
report_peer_version((0, 1, 154));
assert_eq!(get_highest_seen_version(), Some((0, 1, 154)));
}
#[test]
fn major_minor_bumps_accepted() {
reset_version_discovery();
report_peer_version((1, 0, 0));
report_peer_version((1, 0, 0));
assert_eq!(get_highest_seen_version(), Some((1, 0, 0)));
}
}
#[cfg(test)]
mod dual_stack_tests {
use super::*;
use std::net::{Ipv6Addr, SocketAddr};
#[tokio::test]
async fn udp_dual_stack_accepts_ipv4() {
let dual_addr = SocketAddr::new(std::net::IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0);
let dual_sock = <UdpSocket as Socket>::bind(dual_addr)
.await
.expect("bind to [::]:0 should succeed");
let bound_port = dual_sock.local_addr().unwrap().port();
let v4_addr = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
let v4_sender = <UdpSocket as Socket>::bind(v4_addr).await.unwrap();
let target = SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
bound_port,
);
<UdpSocket as Socket>::send_to(&v4_sender, b"hello", target)
.await
.expect("send from IPv4 should succeed");
let mut buf = [0u8; 16];
let (len, src) = <UdpSocket as Socket>::recv_from(&dual_sock, &mut buf)
.await
.unwrap();
assert_eq!(&buf[..len], b"hello");
assert!(
src.is_ipv4(),
"IPv4 source should be normalized to plain IPv4, got {src}"
);
}
#[tokio::test]
async fn udp_dual_stack_accepts_ipv6() {
let dual_addr = SocketAddr::new(std::net::IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0);
let dual_sock = <UdpSocket as Socket>::bind(dual_addr)
.await
.expect("bind to [::]:0 should succeed");
let bound_port = dual_sock.local_addr().unwrap().port();
let v6_addr = SocketAddr::new(std::net::IpAddr::V6(Ipv6Addr::LOCALHOST), 0);
let v6_sender = <UdpSocket as Socket>::bind(v6_addr).await.unwrap();
let target = SocketAddr::new(std::net::IpAddr::V6(Ipv6Addr::LOCALHOST), bound_port);
<UdpSocket as Socket>::send_to(&v6_sender, b"world", target)
.await
.expect("send from IPv6 should succeed");
let mut buf = [0u8; 16];
let (len, _src) = <UdpSocket as Socket>::recv_from(&dual_sock, &mut buf)
.await
.unwrap();
assert_eq!(&buf[..len], b"world");
}
#[test]
fn normalize_mapped_addr_converts_ipv4_mapped() {
let mapped: SocketAddr = "[::ffff:127.0.0.1]:1234".parse().unwrap();
let normalized = normalize_mapped_addr(mapped);
assert_eq!(normalized, "127.0.0.1:1234".parse::<SocketAddr>().unwrap());
}
#[test]
fn normalize_mapped_addr_preserves_native_ipv4() {
let v4: SocketAddr = "1.2.3.4:5678".parse().unwrap();
assert_eq!(normalize_mapped_addr(v4), v4);
}
#[test]
fn normalize_mapped_addr_preserves_native_ipv6() {
let v6: SocketAddr = "[2001:db8::1]:9999".parse().unwrap();
assert_eq!(normalize_mapped_addr(v6), v6);
}
#[test]
fn map_addr_for_send_maps_ipv4_on_ipv6_socket() {
let v4: SocketAddr = "1.2.3.4:5678".parse().unwrap();
let mapped = map_addr_for_send(true, v4);
assert!(mapped.is_ipv6());
if let SocketAddr::V6(v6) = mapped {
assert_eq!(v6.ip().to_ipv4_mapped(), Some("1.2.3.4".parse().unwrap()));
assert_eq!(v6.port(), 5678);
}
}
#[test]
fn map_addr_for_send_noop_on_ipv4_socket() {
let v4: SocketAddr = "1.2.3.4:5678".parse().unwrap();
assert_eq!(map_addr_for_send(false, v4), v4);
}
#[tokio::test]
async fn udp_dual_stack_roundtrip_ipv4() {
let dual_addr = SocketAddr::new(std::net::IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0);
let dual_sock = <UdpSocket as Socket>::bind(dual_addr)
.await
.expect("bind to [::]:0 should succeed");
let dual_port = dual_sock.local_addr().unwrap().port();
let v4_addr = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
let v4_sock = <UdpSocket as Socket>::bind(v4_addr).await.unwrap();
let v4_port = v4_sock.local_addr().unwrap().port();
let target = SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
dual_port,
);
<UdpSocket as Socket>::send_to(&v4_sock, b"ping", target)
.await
.unwrap();
let mut buf = [0u8; 16];
let (len, src) = <UdpSocket as Socket>::recv_from(&dual_sock, &mut buf)
.await
.unwrap();
assert_eq!(&buf[..len], b"ping");
assert!(src.is_ipv4(), "source should be normalized to IPv4");
let reply_target = SocketAddr::new(src.ip(), v4_port);
<UdpSocket as Socket>::send_to(&dual_sock, b"pong", reply_target)
.await
.expect("sending to normalized IPv4 addr from IPv6 socket should work");
let (len, _) = <UdpSocket as Socket>::recv_from(&v4_sock, &mut buf)
.await
.unwrap();
assert_eq!(&buf[..len], b"pong");
}
}