use std::{
fmt,
io::{self, IoSliceMut},
net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV6},
num::NonZeroUsize,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use bytes::Bytes;
use iroh_base::{CustomAddr, EndpointId, RelayUrl, TransportAddr};
use iroh_relay::RelayMap;
use n0_watcher::Watcher;
use noq_proto::PathStatus;
use relay::{RelayNetworkChangeSender, RelaySender};
use rustc_hash::FxHashMap;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, instrument, trace, warn};
use super::{Socket, mapped_addrs::MultipathMappedAddr};
use crate::{metrics::EndpointMetrics, net_report::Report};
pub(crate) mod custom;
#[cfg(not(wasm_browser))]
mod ip;
mod relay;
use custom::{CustomEndpoint, CustomSender, CustomTransport};
#[cfg(not(wasm_browser))]
pub(crate) use self::ip::Config as IpConfig;
#[cfg(not(wasm_browser))]
use self::ip::{IpNetworkChangeSender, IpTransports, IpTransportsSender};
pub(crate) use self::relay::{HomeRelayStatus, HomeRelayWatch, RelayActorConfig, RelayTransport};
#[derive(Debug)]
pub(crate) struct Transports {
#[cfg(not(wasm_browser))]
ip: IpTransports,
relay: Vec<RelayTransport>,
custom: Vec<Box<dyn CustomEndpoint>>,
poll_recv_counter: usize,
source_addrs: [Addr; noq_udp::BATCH_SIZE],
}
type IpTransportsWatcher = n0_watcher::Join<SocketAddr, n0_watcher::Direct<SocketAddr>>;
type CustomTransportsWatcher =
n0_watcher::Join<Vec<CustomAddr>, n0_watcher::Direct<Vec<CustomAddr>>>;
type RelayTransportsWatcher = n0_watcher::Join<
Option<(RelayUrl, EndpointId)>,
n0_watcher::Map<
n0_watcher::Direct<Option<(RelayUrl, HomeRelayStatus)>>,
Option<(RelayUrl, EndpointId)>,
>,
>;
pub(super) type HomeRelayWatcher = n0_watcher::Join<
Option<(RelayUrl, HomeRelayStatus)>,
n0_watcher::Direct<Option<(RelayUrl, HomeRelayStatus)>>,
>;
#[cfg(not(wasm_browser))]
pub(crate) type LocalAddrsWatch = n0_watcher::Map<
n0_watcher::Tuple<
n0_watcher::Tuple<IpTransportsWatcher, CustomTransportsWatcher>,
RelayTransportsWatcher,
>,
Vec<Addr>,
>;
#[cfg(wasm_browser)]
pub(crate) type LocalAddrsWatch =
n0_watcher::Map<n0_watcher::Tuple<CustomTransportsWatcher, RelayTransportsWatcher>, Vec<Addr>>;
#[derive(Debug, Clone)]
#[non_exhaustive]
pub(crate) enum TransportConfig {
#[cfg(not(wasm_browser))]
Ip {
config: ip::Config,
is_user_defined: bool,
},
Relay {
relay_map: RelayMap,
is_user_defined: bool,
},
#[cfg_attr(not(feature = "unstable-custom-transports"), allow(dead_code))]
Custom(Arc<dyn CustomTransport>),
}
impl TransportConfig {
#[cfg(not(wasm_browser))]
pub(crate) fn default_ipv4() -> Self {
use std::net::Ipv4Addr;
use ipnet::Ipv4Net;
Self::Ip {
config: ip::Config::V4 {
ip_net: Ipv4Net::new(Ipv4Addr::UNSPECIFIED, 0).expect("checked"),
port: 0,
is_required: true,
is_default: false,
},
is_user_defined: false,
}
}
#[cfg(not(wasm_browser))]
pub(crate) fn default_ipv6() -> Self {
use ipnet::Ipv6Net;
Self::Ip {
config: ip::Config::V6 {
ip_net: Ipv6Net::new(Ipv6Addr::UNSPECIFIED, 0).expect("checked"),
scope_id: 0,
port: 0,
is_required: false,
is_default: false,
},
is_user_defined: false,
}
}
#[cfg(not(wasm_browser))]
pub(crate) fn is_ipv4_default(&self) -> bool {
match self {
Self::Ip { config, .. } => config.is_default() && config.is_ipv4(),
_ => false,
}
}
#[cfg(not(wasm_browser))]
pub(crate) fn is_ipv6_default(&self) -> bool {
match self {
Self::Ip { config, .. } => config.is_default() && config.is_ipv6(),
_ => false,
}
}
pub(crate) fn is_user_defined(&self) -> bool {
match self {
#[cfg(not(wasm_browser))]
Self::Ip {
is_user_defined, ..
} => *is_user_defined,
Self::Relay {
is_user_defined, ..
} => *is_user_defined,
Self::Custom(_) => true,
}
}
}
impl Transports {
pub(crate) fn bind(
configs: &[TransportConfig],
relay_actor_config: RelayActorConfig,
metrics: &EndpointMetrics,
shutdown_token: CancellationToken,
) -> io::Result<Self> {
#[cfg(not(wasm_browser))]
let ip_configs = {
let mut ip_configs = Vec::new();
let has_ipv4_default = configs
.iter()
.any(|t| t.is_ipv4_default() && t.is_user_defined());
let has_ipv6_default = configs
.iter()
.any(|t| t.is_ipv6_default() && t.is_user_defined());
for config in configs {
if let TransportConfig::Ip {
config,
is_user_defined,
} = config
{
if !is_user_defined
&& (config.is_ipv4() && has_ipv4_default
|| config.is_ipv6() && has_ipv6_default)
{
continue;
}
ip_configs.push(*config);
}
}
ip_configs
};
#[cfg(not(wasm_browser))]
let ip = IpTransports::bind(ip_configs.into_iter(), metrics)?;
let relay = configs
.iter()
.filter(|t| matches!(t, TransportConfig::Relay { .. }))
.map(|_c| RelayTransport::new(relay_actor_config.clone(), shutdown_token.child_token()))
.collect();
let mut custom = Vec::new();
for config in configs.iter().filter_map(|t| {
if let TransportConfig::Custom(config) = t {
Some(config)
} else {
None
}
}) {
let transport = config.bind()?;
custom.push(transport);
}
Ok(Self {
#[cfg(not(wasm_browser))]
ip,
relay,
custom,
poll_recv_counter: Default::default(),
source_addrs: Default::default(),
})
}
pub(crate) fn poll_recv(
&mut self,
cx: &mut Context,
bufs: &mut [io::IoSliceMut<'_>],
metas: &mut [noq_udp::RecvMeta],
sock: &Socket,
) -> Poll<io::Result<usize>> {
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");
debug_assert!(bufs.len() <= noq_udp::BATCH_SIZE, "too many buffers");
if sock.is_closing() {
return Poll::Pending;
}
match self.inner_poll_recv(cx, bufs, metas)? {
Poll::Pending | Poll::Ready(0) => Poll::Pending,
Poll::Ready(n) => {
sock.process_datagrams(&mut bufs[..n], &mut metas[..n], &self.source_addrs[..n]);
Poll::Ready(Ok(n))
}
}
}
fn inner_poll_recv(
&mut self,
cx: &mut Context,
bufs: &mut [IoSliceMut<'_>],
metas: &mut [noq_udp::RecvMeta],
) -> Poll<io::Result<usize>> {
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");
macro_rules! poll_transport {
($socket:expr) => {
match $socket.poll_recv(cx, bufs, metas, &mut self.source_addrs)? {
Poll::Pending | Poll::Ready(0) => {}
Poll::Ready(n) => {
return Poll::Ready(Ok(n));
}
}
};
}
let counter = self.poll_recv_counter.wrapping_add(1);
if counter.is_multiple_of(2) {
#[cfg(not(wasm_browser))]
poll_transport!(&mut self.ip);
for transport in self.relay.iter_mut() {
poll_transport!(transport);
}
for transport in self.custom.iter_mut() {
poll_transport!(transport);
}
} else {
for transport in self.custom.iter_mut().rev() {
poll_transport!(transport);
}
for transport in self.relay.iter_mut().rev() {
poll_transport!(transport);
}
#[cfg(not(wasm_browser))]
poll_transport!(&mut self.ip);
}
Poll::Pending
}
pub(crate) fn local_addrs(&self) -> Vec<Addr> {
self.local_addrs_watch().get()
}
pub(super) fn home_relay_watch(&self) -> HomeRelayWatcher {
n0_watcher::Join::new(self.relay.iter().map(|t| t.my_relay_status()))
}
#[cfg(not(wasm_browser))]
pub(crate) fn local_addrs_watch(&self) -> LocalAddrsWatch {
let ips = n0_watcher::Join::new(self.ip.iter().map(|t| t.local_addr_watch()));
let relays = n0_watcher::Join::new(self.relay.iter().map(|t| t.local_addr_watch()));
let custom = n0_watcher::Join::new(self.custom.iter().map(|t| t.watch_local_addrs()));
ips.or(custom).or(relays).map(|((ips, custom), relays)| {
let ips = ips.into_iter().map(Addr::from);
let custom = custom.into_iter().flatten().map(Addr::from);
let relays = relays
.into_iter()
.flatten()
.map(|(relay_url, endpoint_id)| Addr::Relay(relay_url, endpoint_id));
ips.chain(custom).chain(relays).collect()
})
}
#[cfg(wasm_browser)]
pub(crate) fn local_addrs_watch(&self) -> LocalAddrsWatch {
let relays = n0_watcher::Join::new(self.relay.iter().map(|t| t.local_addr_watch()));
let custom = n0_watcher::Join::new(self.custom.iter().map(|t| t.watch_local_addrs()));
custom.or(relays).map(|(custom, relays)| {
let custom = custom.into_iter().flatten().map(Addr::from);
let relays = relays
.into_iter()
.flatten()
.map(|(relay_url, endpoint_id)| Addr::Relay(relay_url, endpoint_id));
custom.chain(relays).collect()
})
}
#[cfg(not(wasm_browser))]
pub(crate) fn ip_bind_addrs(&self) -> Vec<SocketAddr> {
self.ip.iter().map(|t| t.bind_addr()).collect()
}
#[cfg(not(wasm_browser))]
pub(crate) fn max_transmit_segments(&self) -> NonZeroUsize {
let ip = self.ip.iter().map(|t| t.max_transmit_segments());
let custom = self.custom.iter().map(|t| t.max_transmit_segments());
ip.chain(custom).min().unwrap_or(NonZeroUsize::MIN)
}
#[cfg(wasm_browser)]
pub(crate) fn max_transmit_segments(&self) -> NonZeroUsize {
self.custom
.iter()
.map(|t| t.max_transmit_segments())
.min()
.unwrap_or(NonZeroUsize::MIN)
}
#[cfg(not(wasm_browser))]
pub(crate) fn max_receive_segments(&self) -> NonZeroUsize {
let res = self.ip.iter().map(|t| t.max_receive_segments()).max();
res.unwrap_or(NonZeroUsize::MIN)
}
#[cfg(wasm_browser)]
pub(crate) fn max_receive_segments(&self) -> NonZeroUsize {
NonZeroUsize::MIN
}
#[cfg(not(wasm_browser))]
pub(crate) fn may_fragment(&self) -> bool {
self.ip.iter().any(|t| t.may_fragment())
}
#[cfg(wasm_browser)]
pub(crate) fn may_fragment(&self) -> bool {
false
}
pub(crate) fn create_sender(&self) -> TransportsSender {
#[cfg(not(wasm_browser))]
let ip = self.ip.create_sender();
let relay = self.relay.iter().map(|t| t.create_sender()).collect();
let custom = self.custom.iter().map(|t| t.create_sender()).collect();
let max_transmit_segments = self.max_transmit_segments();
TransportsSender {
#[cfg(not(wasm_browser))]
ip,
relay,
custom,
max_transmit_segments,
}
}
pub(crate) fn create_network_change_sender(&self) -> NetworkChangeSender {
NetworkChangeSender {
#[cfg(not(wasm_browser))]
ip: self
.ip
.iter()
.map(|t| t.create_network_change_sender())
.collect(),
relay: self
.relay
.iter()
.map(|t| t.create_network_change_sender())
.collect(),
}
}
}
#[derive(Debug)]
pub(crate) struct NetworkChangeSender {
#[cfg(not(wasm_browser))]
ip: Vec<IpNetworkChangeSender>,
relay: Vec<RelayNetworkChangeSender>,
}
impl NetworkChangeSender {
pub(crate) fn on_network_change(&self, report: &Report) {
#[cfg(not(wasm_browser))]
for ip in &self.ip {
ip.on_network_change(report);
}
for relay in &self.relay {
relay.on_network_change(report);
}
}
pub(crate) fn check_relay_connection(&self) {
for relay in &self.relay {
relay.check_connection_after_network_change();
}
}
pub(crate) fn rebind(&self) -> std::io::Result<()> {
let mut res = Ok(());
#[cfg(not(wasm_browser))]
for transport in &self.ip {
if let Err(err) = transport.rebind() {
warn!("failed to rebind {:?}", err);
res = Err(err);
}
}
for transport in &self.relay {
if let Err(err) = transport.rebind() {
warn!("failed to rebind {:?}", err);
res = Err(err);
}
}
res
}
}
#[derive(Debug, Clone)]
pub struct Transmit<'a> {
pub(crate) ecn: Option<noq_udp::EcnCodepoint>,
pub contents: &'a [u8],
pub segment_size: Option<usize>,
}
impl<'a> Transmit<'a> {
fn datagram_count(&self) -> usize {
match self.segment_size {
None => 1,
Some(size) => self.contents.len().div_ceil(size),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct OwnedTransmit {
pub(crate) ecn: Option<noq_udp::EcnCodepoint>,
pub(crate) contents: Bytes,
pub(crate) segment_size: Option<usize>,
}
impl From<&noq_udp::Transmit<'_>> for OwnedTransmit {
fn from(source: &noq_udp::Transmit<'_>) -> Self {
Self {
ecn: source.ecn,
contents: Bytes::copy_from_slice(source.contents),
segment_size: source.segment_size,
}
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Addr {
Ip(SocketAddr),
Relay(RelayUrl, EndpointId),
Custom(CustomAddr),
}
impl fmt::Debug for Addr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Addr::Ip(addr) => write!(f, "Ip({addr})"),
Addr::Relay(url, node_id) => write!(f, "Relay({url}, {})", node_id.fmt_short()),
Addr::Custom(custom_addr) => write!(f, "Custom({custom_addr:?})"),
}
}
}
impl Default for Addr {
fn default() -> Self {
Self::Ip(SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::UNSPECIFIED,
0,
0,
0,
)))
}
}
impl From<SocketAddr> for Addr {
fn from(value: SocketAddr) -> Self {
match value {
SocketAddr::V4(_) => Self::Ip(value),
SocketAddr::V6(addr) => {
Self::Ip(SocketAddr::new(addr.ip().to_canonical(), addr.port()))
}
}
}
}
impl From<&SocketAddr> for Addr {
fn from(value: &SocketAddr) -> Self {
match value {
SocketAddr::V4(_) => Self::Ip(*value),
SocketAddr::V6(addr) => {
Self::Ip(SocketAddr::new(addr.ip().to_canonical(), addr.port()))
}
}
}
}
impl From<CustomAddr> for Addr {
fn from(value: CustomAddr) -> Self {
Self::Custom(value)
}
}
impl From<(RelayUrl, EndpointId)> for Addr {
fn from(value: (RelayUrl, EndpointId)) -> Self {
Self::Relay(value.0, value.1)
}
}
impl From<Addr> for TransportAddr {
fn from(value: Addr) -> Self {
match value {
Addr::Ip(addr) => TransportAddr::Ip(addr),
Addr::Relay(url, _) => TransportAddr::Relay(url),
Addr::Custom(custom_addr) => TransportAddr::Custom(custom_addr),
}
}
}
impl Addr {
pub(crate) fn is_relay(&self) -> bool {
matches!(self, Self::Relay(..))
}
pub(crate) fn is_ip(&self) -> bool {
matches!(self, Self::Ip(_))
}
pub(crate) fn into_socket_addr(self) -> Option<SocketAddr> {
match self {
Self::Ip(ip) => Some(ip),
Self::Relay(..) => None,
Self::Custom(..) => None,
}
}
pub(crate) fn addr_kind(&self) -> AddrKind {
match self {
Self::Ip(addr) => match addr {
SocketAddr::V4(_) => AddrKind::IpV4,
SocketAddr::V6(_) => AddrKind::IpV6,
},
Self::Relay(_, _) => AddrKind::Relay,
Self::Custom(addr) => AddrKind::Custom(addr.id()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum AddrKind {
IpV4,
IpV6,
Relay,
Custom(u64),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) enum TransportType {
Primary,
Backup,
}
impl TransportType {
pub(super) fn to_path_status(self) -> PathStatus {
match self {
Self::Primary => PathStatus::Available,
Self::Backup => PathStatus::Backup,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct TransportBias {
pub(crate) transport_type: TransportType,
pub(crate) rtt_bias: i128,
}
impl TransportBias {
pub fn primary() -> Self {
Self {
transport_type: TransportType::Primary,
rtt_bias: 0,
}
}
pub(crate) fn backup() -> Self {
Self {
transport_type: TransportType::Backup,
rtt_bias: 0,
}
}
pub fn with_rtt_advantage(mut self, advantage: Duration) -> Self {
self.rtt_bias -= advantage.as_nanos() as i128;
self
}
pub fn with_rtt_disadvantage(mut self, disadvantage: Duration) -> Self {
self.rtt_bias += disadvantage.as_nanos() as i128;
self
}
}
#[derive(Debug, Clone)]
pub struct TransportBiasMap {
map: Arc<FxHashMap<AddrKind, TransportBias>>,
}
pub(super) const IPV6_RTT_ADVANTAGE: Duration = Duration::from_millis(3);
impl Default for TransportBiasMap {
fn default() -> Self {
let mut map = FxHashMap::default();
map.insert(AddrKind::IpV4, TransportBias::primary());
map.insert(
AddrKind::IpV6,
TransportBias::primary().with_rtt_advantage(IPV6_RTT_ADVANTAGE),
);
map.insert(AddrKind::Relay, TransportBias::backup());
Self { map: Arc::new(map) }
}
}
impl TransportBiasMap {
pub fn with_bias(self, kind: AddrKind, bias: TransportBias) -> Self {
let mut map = (*self.map).clone();
map.insert(kind, bias);
Self { map: Arc::new(map) }
}
pub fn get(&self, addr: &Addr) -> TransportBias {
self.map
.get(&addr.addr_kind())
.cloned()
.unwrap_or_else(TransportBias::primary)
}
pub fn path_selection_data(&self, addr: &Addr, rtt: Duration) -> PathSelectionData {
let bias = self.get(addr);
let tpe = bias.transport_type;
let biased_rtt = rtt.as_nanos() as i128 + bias.rtt_bias;
PathSelectionData {
transport_type: tpe,
rtt,
biased_rtt,
}
}
}
#[derive(Debug)]
pub struct PathSelectionData {
pub transport_type: TransportType,
pub rtt: Duration,
pub biased_rtt: i128,
}
impl PathSelectionData {
pub fn sort_key(&self) -> (u8, i128) {
(self.transport_type as u8, self.biased_rtt)
}
}
impl PartialEq<TransportAddr> for Addr {
fn eq(&self, other: &TransportAddr) -> bool {
match self {
Addr::Ip(socket_addr) => {
matches!(other, TransportAddr::Ip(a) if a == socket_addr)
}
Addr::Relay(relay_url, _) => {
matches!(other, TransportAddr::Relay(a) if a == relay_url)
}
Addr::Custom(custom_addr) => {
matches!(other, TransportAddr::Custom(a) if a == custom_addr)
}
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct TransportsSender {
#[cfg(not(wasm_browser))]
ip: IpTransportsSender,
relay: Vec<RelaySender>,
custom: Vec<Arc<dyn CustomSender>>,
max_transmit_segments: NonZeroUsize,
}
impl TransportsSender {
#[instrument(name = "poll_send", skip(self, cx, transmit), fields(len = transmit.contents.len()))]
pub(crate) fn poll_send(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context,
dst: &Addr,
src: Option<IpAddr>,
transmit: &Transmit<'_>,
) -> Poll<io::Result<()>> {
match dst {
#[cfg(wasm_browser)]
Addr::Ip(..) => {
return Poll::Ready(Err(io::Error::other("IP is unsupported in browser")));
}
#[cfg(not(wasm_browser))]
Addr::Ip(dst_addr) => match dst_addr {
SocketAddr::V4(_) => {
if let Some(sender) = self
.ip
.v4_iter_mut()
.find(|s| s.is_valid_send_addr(src, dst_addr))
{
return Pin::new(sender).poll_send(cx, *dst_addr, src, transmit);
}
if let Some(sender) = self.ip.v4_default_mut()
&& sender.is_valid_default_addr(src, dst_addr)
{
return Pin::new(sender).poll_send(cx, *dst_addr, src, transmit);
}
}
SocketAddr::V6(_) => {
if let Some(sender) = self
.ip
.v6_iter_mut()
.find(|s| s.is_valid_send_addr(src, dst_addr))
{
return Pin::new(sender).poll_send(cx, *dst_addr, src, transmit);
}
if let Some(sender) = self.ip.v6_default_mut()
&& sender.is_valid_default_addr(src, dst_addr)
{
return Pin::new(sender).poll_send(cx, *dst_addr, src, transmit);
}
}
},
Addr::Relay(url, endpoint_id) => {
let mut has_valid_sender = false;
for sender in self
.relay
.iter_mut()
.filter(|s| s.is_valid_send_addr(url, endpoint_id))
{
has_valid_sender = true;
match sender.poll_send(cx, url.clone(), *endpoint_id, transmit) {
Poll::Pending => {}
Poll::Ready(res) => return Poll::Ready(res),
}
}
if has_valid_sender {
return Poll::Pending;
}
}
Addr::Custom(addr) => {
for sender in &mut self.custom {
if sender.is_valid_send_addr(addr) {
match sender.poll_send(cx, addr, transmit) {
Poll::Pending => {}
Poll::Ready(res) => return Poll::Ready(res),
}
}
}
}
}
trace!(?src, ?dst, "no valid transport available");
Poll::Ready(Ok(()))
}
}
#[derive(Debug)]
pub(crate) struct Transport {
sock: Arc<Socket>,
transports: Transports,
}
impl Transport {
pub(crate) fn new(sock: Arc<Socket>, transports: Transports) -> Self {
Self { sock, transports }
}
}
impl noq::AsyncUdpSocket for Transport {
fn create_sender(&self) -> Pin<Box<dyn noq::UdpSender>> {
Box::pin(Sender {
sock: self.sock.clone(),
sender: self.transports.create_sender(),
})
}
fn poll_recv(
&mut self,
cx: &mut Context,
bufs: &mut [IoSliceMut<'_>],
meta: &mut [noq_udp::RecvMeta],
) -> Poll<io::Result<usize>> {
self.transports.poll_recv(cx, bufs, meta, &self.sock)
}
#[cfg(not(wasm_browser))]
fn local_addr(&self) -> io::Result<SocketAddr> {
let local_addrs = self.transports.local_addrs();
let addrs: Vec<_> = local_addrs
.into_iter()
.map(|addr| {
use crate::socket::mapped_addrs::DEFAULT_FAKE_ADDR;
match addr {
Addr::Ip(addr) => addr,
Addr::Relay(..) => DEFAULT_FAKE_ADDR.into(),
Addr::Custom(_) => DEFAULT_FAKE_ADDR.into(),
}
})
.collect();
if let Some(addr) = addrs.iter().find(|addr| addr.is_ipv6()) {
return Ok(*addr);
}
if let Some(SocketAddr::V4(addr)) = addrs.first() {
let ip = addr.ip().to_ipv6_mapped().into();
return Ok(SocketAddr::new(ip, addr.port()));
}
if !self.transports.relay.is_empty() {
use crate::socket::mapped_addrs::DEFAULT_FAKE_ADDR;
return Ok(DEFAULT_FAKE_ADDR.into());
}
if !self.transports.custom.is_empty() {
use crate::socket::mapped_addrs::DEFAULT_FAKE_ADDR;
return Ok(DEFAULT_FAKE_ADDR.into());
}
Err(io::Error::other("no valid address available"))
}
#[cfg(wasm_browser)]
fn local_addr(&self) -> io::Result<SocketAddr> {
Ok(SocketAddr::new(std::net::Ipv6Addr::LOCALHOST.into(), 0))
}
fn max_receive_segments(&self) -> NonZeroUsize {
self.transports.max_receive_segments()
}
fn may_fragment(&self) -> bool {
self.transports.may_fragment()
}
}
#[derive(Debug)]
#[pin_project::pin_project]
pub(crate) struct Sender {
sock: Arc<Socket>,
#[pin]
sender: TransportsSender,
}
impl Sender {
fn mapped_addr(&self, transmit: &noq_udp::Transmit) -> io::Result<MultipathMappedAddr> {
if self.sock.is_closed() {
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection closed",
));
}
Ok(MultipathMappedAddr::from(transmit.destination))
}
}
impl noq::UdpSender for Sender {
fn poll_send(
self: Pin<&mut Self>,
noq_transmit: &noq_udp::Transmit,
cx: &mut Context,
) -> Poll<io::Result<()>> {
let mapped_addr = self.mapped_addr(noq_transmit)?;
let transport_addr = match mapped_addr {
MultipathMappedAddr::Mixed(mapped_addr) => {
let Some(endpoint_id) = self.sock.mapped_addrs.endpoint_addrs.lookup(&mapped_addr)
else {
error!(dst = ?mapped_addr, "unknown NodeIdMappedAddr, dropped transmit");
return Poll::Ready(Ok(()));
};
if let Some(src_ip) = noq_transmit.src_ip {
warn!(dst = ?mapped_addr, ?src_ip, dst_endpoint = %endpoint_id.fmt_short(),
"oops, flub didn't think this would happen");
}
match self.sock.try_send_remote_state_msg(
endpoint_id,
super::RemoteStateMessage::SendDatagram(
Box::new(self.sender.clone()),
OwnedTransmit::from(noq_transmit),
),
) {
Ok(()) => {
trace!(dst = ?mapped_addr, dst_endpoint = %endpoint_id.fmt_short(), "sent transmit");
return Poll::Ready(Ok(()));
}
Err(msg) => {
debug!(
dst = ?mapped_addr,
dst_endpoint = %endpoint_id.fmt_short(),
?msg,
"RemoteStateActor inbox dropped message"
);
return Poll::Ready(Ok(()));
}
};
}
MultipathMappedAddr::Relay(relay_mapped_addr) => {
match self
.sock
.mapped_addrs
.relay_addrs
.lookup(&relay_mapped_addr)
{
Some((relay_url, endpoint_id)) => Addr::Relay(relay_url, endpoint_id),
None => {
error!("unknown RelayMappedAddr, dropped transmit");
return Poll::Ready(Ok(()));
}
}
}
MultipathMappedAddr::Custom(custom_mapped_addr) => {
match self
.sock
.mapped_addrs
.custom_addrs
.lookup(&custom_mapped_addr)
{
Some(addr) => Addr::Custom(addr),
None => {
error!("unknown CustomMappedAddr, dropped transmit");
return Poll::Ready(Ok(()));
}
}
}
MultipathMappedAddr::Ip(socket_addr) => {
let socket_addr =
SocketAddr::new(socket_addr.ip().to_canonical(), socket_addr.port());
Addr::Ip(socket_addr)
}
};
let transmit = Transmit {
ecn: noq_transmit.ecn,
contents: noq_transmit.contents,
segment_size: noq_transmit.segment_size,
};
let this = self.project();
match this
.sender
.poll_send(cx, &transport_addr, noq_transmit.src_ip, &transmit)
{
Poll::Ready(Ok(())) => {
trace!(
dst = ?transport_addr,
len = transmit.contents.len(),
datagram_count = transmit.datagram_count(),
"sent transmit"
);
Poll::Ready(Ok(()))
}
Poll::Ready(Err(ref err)) => {
warn!(dst=?transport_addr, "dropped transmit: {err:#}");
Poll::Ready(Ok(()))
}
Poll::Pending => {
trace!(dst=?transport_addr, "transport pending, dropped transmit");
Poll::Ready(Ok(()))
}
}
}
fn max_transmit_segments(&self) -> NonZeroUsize {
self.sender.max_transmit_segments
}
}
#[cfg(test)]
mod tests {
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
use iroh_base::{EndpointId, RelayUrl};
use super::*;
fn v4(port: u16) -> Addr {
Addr::Ip(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)))
}
fn v6(port: u16) -> Addr {
Addr::Ip(SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::LOCALHOST,
port,
0,
0,
)))
}
fn relay(port: u16) -> Addr {
let url = format!("https://relay{port}.iroh.computer")
.parse::<RelayUrl>()
.unwrap();
Addr::Relay(url, EndpointId::from_bytes(&[0u8; 32]).unwrap())
}
#[test]
fn test_transport_bias_map_default() {
let bias_map = TransportBiasMap::default();
let v4_bias = bias_map.get(&v4(1));
assert_eq!(v4_bias.transport_type, TransportType::Primary);
assert_eq!(v4_bias.rtt_bias, 0);
let v6_bias = bias_map.get(&v6(1));
assert_eq!(v6_bias.transport_type, TransportType::Primary);
assert_eq!(v6_bias.rtt_bias, -(IPV6_RTT_ADVANTAGE.as_nanos() as i128));
let relay_bias = bias_map.get(&relay(1));
assert_eq!(relay_bias.transport_type, TransportType::Backup);
assert_eq!(relay_bias.rtt_bias, 0);
}
#[test]
fn test_ipv6_bias_gives_advantage() {
let bias_map = TransportBiasMap::default();
let rtt = Duration::from_millis(50);
let v4_bias = bias_map.get(&v4(1));
let v6_bias = bias_map.get(&v6(1));
let v4_biased_rtt = rtt.as_nanos() as i128 + v4_bias.rtt_bias;
let v6_biased_rtt = rtt.as_nanos() as i128 + v6_bias.rtt_bias;
assert!(v6_biased_rtt < v4_biased_rtt);
assert_eq!(
v4_biased_rtt - v6_biased_rtt,
IPV6_RTT_ADVANTAGE.as_nanos() as i128
);
}
#[test]
fn test_relay_is_backup() {
let bias_map = TransportBiasMap::default();
let relay_bias = bias_map.get(&relay(1));
assert_eq!(relay_bias.transport_type, TransportType::Backup);
let v4_bias = bias_map.get(&v4(1));
assert!(v4_bias.transport_type < relay_bias.transport_type);
}
}