use std::{
collections::{BTreeSet, VecDeque},
net::SocketAddr,
pin::Pin,
sync::Arc,
task::Poll,
};
use iroh_base::{CustomAddr, EndpointId, RelayUrl, TransportAddr};
use n0_error::StackResultExt;
use n0_future::{
FuturesUnordered, MergeUnbounded, Stream, StreamExt,
boxed::BoxStream,
task::JoinSet,
time::{self, Duration, Instant},
};
use n0_watcher::{Watchable, Watcher};
use noq::{ConnectionError, WeakConnectionHandle};
use noq_proto::{PathError, PathEvent, PathId, n0_nat_traversal};
use rustc_hash::FxHashMap;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, Level, Span, debug, error, event, info_span, instrument, trace, warn};
use self::path_state::RemotePathState;
pub(crate) use self::path_watcher::PathWatchable;
pub use self::{
path_watcher::{PathInfo, PathInfoList, PathInfoListIter, PathWatcher},
remote_info::{RemoteInfo, TransportAddrInfo, TransportAddrUsage},
};
use super::Source;
use crate::{
address_lookup::{AddressLookupFailed, AddressLookupServices, Item as AddressLookupItem},
endpoint::DirectAddr,
socket::{
Metrics as SocketMetrics, RELAY_PATH_MAX_IDLE_TIMEOUT,
mapped_addrs::{AddrMap, CustomMappedAddr, MappedAddr, RelayMappedAddr},
remote_map::{Private, to_transport_addr},
transports::{self, OwnedTransmit, PathSelectionData, TransportBiasMap, TransportsSender},
},
util::MaybeFuture,
};
mod path_state;
mod path_watcher;
mod remote_info;
const HOLEPUNCH_ATTEMPTS_INTERVAL: Duration = Duration::from_secs(5);
const GOOD_ENOUGH_LATENCY: Duration = Duration::from_millis(10);
const UPGRADE_INTERVAL: Duration = Duration::from_secs(60);
const ACTOR_MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
const RTT_SWITCHING_MIN_IP: Duration = Duration::from_millis(5);
type PathEvents = MergeUnbounded<
Pin<Box<dyn Stream<Item = (ConnId, Result<PathEvent, noq::Lagged>)> + Send + Sync>>,
>;
type AddrEvents = MergeUnbounded<
Pin<
Box<
dyn Stream<Item = (ConnId, Result<n0_nat_traversal::Event, noq::Lagged>)> + Send + Sync,
>,
>,
>;
pub(super) struct RemoteStateActor {
endpoint_id: EndpointId,
metrics: Arc<SocketMetrics>,
local_direct_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>,
custom_mapped_addrs: AddrMap<CustomAddr, CustomMappedAddr>,
address_lookup: AddressLookupServices,
connections: FxHashMap<ConnId, ConnectionState>,
connections_close: FuturesUnordered<OnClosed>,
path_events: PathEvents,
addr_events: AddrEvents,
paths: RemotePathState,
last_holepunch: Option<HolepunchAttempt>,
selected_path: Watchable<Option<transports::Addr>>,
scheduled_holepunch: Option<Instant>,
scheduled_open_path: Option<Instant>,
pending_open_paths: VecDeque<transports::Addr>,
address_lookup_stream: Option<BoxStream<Result<AddressLookupItem, AddressLookupFailed>>>,
transport_bias: TransportBiasMap,
}
impl RemoteStateActor {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
endpoint_id: EndpointId,
local_direct_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>,
custom_mapped_addrs: AddrMap<CustomAddr, CustomMappedAddr>,
metrics: Arc<SocketMetrics>,
address_lookup: AddressLookupServices,
transport_bias: TransportBiasMap,
) -> Self {
Self {
endpoint_id,
metrics: metrics.clone(),
local_direct_addrs,
relay_mapped_addrs,
custom_mapped_addrs,
address_lookup,
connections: FxHashMap::default(),
connections_close: Default::default(),
path_events: Default::default(),
addr_events: Default::default(),
paths: RemotePathState::new(metrics),
last_holepunch: None,
selected_path: Default::default(),
scheduled_holepunch: None,
scheduled_open_path: None,
pending_open_paths: VecDeque::new(),
address_lookup_stream: None,
transport_bias,
}
}
pub(super) fn start(
self,
initial_msgs: Vec<RemoteStateMessage>,
tasks: &mut JoinSet<(EndpointId, Vec<RemoteStateMessage>)>,
shutdown_token: CancellationToken,
parent_span: Span,
) -> mpsc::Sender<RemoteStateMessage> {
let (tx, rx) = mpsc::channel(16);
let endpoint_id = self.endpoint_id;
tasks.spawn(
self.run(initial_msgs, rx, shutdown_token)
.instrument(info_span!(
parent: parent_span,
"RemoteStateActor",
remote = %endpoint_id.fmt_short(),
)),
);
tx
}
async fn run(
mut self,
initial_msgs: Vec<RemoteStateMessage>,
mut inbox: mpsc::Receiver<RemoteStateMessage>,
shutdown_token: CancellationToken,
) -> (EndpointId, Vec<RemoteStateMessage>) {
trace!("actor started");
for msg in initial_msgs {
self.handle_message(msg).await;
}
let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT);
n0_future::pin!(idle_timeout);
let check_connections = time::interval(UPGRADE_INTERVAL);
n0_future::pin!(check_connections);
loop {
let scheduled_path_open = match self.scheduled_open_path {
Some(when) => MaybeFuture::Some(time::sleep_until(when)),
None => MaybeFuture::None,
};
n0_future::pin!(scheduled_path_open);
let scheduled_hp = match self.scheduled_holepunch {
Some(when) => MaybeFuture::Some(time::sleep_until(when)),
None => MaybeFuture::None,
};
n0_future::pin!(scheduled_hp);
if !inbox.is_empty() || !self.connections.is_empty() {
idle_timeout
.as_mut()
.reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT);
}
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
trace!("actor cancelled");
break;
}
msg = inbox.recv() => {
match msg {
Some(msg) => self.handle_message(msg).await,
None => break,
}
}
Some((id, evt)) = self.path_events.next() => {
self.handle_path_event(id, evt);
}
Some((id, evt)) = self.addr_events.next() => {
trace!(?id, ?evt, "remote addrs updated, triggering holepunching");
self.trigger_holepunching();
}
Some((conn_id, reason)) = self.connections_close.next(), if !self.connections_close.is_empty() => {
self.handle_connection_close(conn_id, reason);
}
res = self.local_direct_addrs.updated() => {
if let Err(n0_watcher::Disconnected) = res {
trace!("direct address watcher disconnected, shutting down");
break;
}
self.update_local_direct_address();
trace!("local addrs updated, triggering holepunching");
self.trigger_holepunching();
}
_ = &mut scheduled_path_open => {
trace!("triggering scheduled path_open");
self.scheduled_open_path = None;
let mut addrs = std::mem::take(&mut self.pending_open_paths);
while let Some(addr) = addrs.pop_front() {
self.open_path(&addr);
}
}
_ = &mut scheduled_hp => {
trace!("triggering scheduled holepunching");
self.scheduled_holepunch = None;
self.trigger_holepunching();
}
Some(item) = maybe_next(self.address_lookup_stream.as_mut()), if self.address_lookup_stream.is_some() => {
self.handle_address_lookup_item(item);
}
_ = check_connections.tick() => {
self.check_connections();
}
_ = &mut idle_timeout => {
if self.connections.is_empty() && inbox.is_empty() {
trace!("idle timeout expired and still idle: terminate actor");
break;
} else {
idle_timeout.as_mut().reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT);
}
}
}
}
inbox.close();
let mut leftover_msgs = Vec::with_capacity(inbox.len());
inbox.recv_many(&mut leftover_msgs, inbox.len()).await;
trace!("actor terminating");
(self.endpoint_id, leftover_msgs)
}
#[instrument(skip(self))]
async fn handle_message(&mut self, msg: RemoteStateMessage) {
match msg {
RemoteStateMessage::SendDatagram(sender, transmit) => {
self.handle_msg_send_datagram(sender, transmit).await;
}
RemoteStateMessage::AddConnection(handle, tx) => {
self.handle_msg_add_connection(handle, tx);
}
RemoteStateMessage::ResolveRemote(addrs, tx) => {
self.handle_msg_resolve_remote(addrs, tx);
}
RemoteStateMessage::RemoteInfo(tx) => {
let addrs = self.paths.to_remote_addrs();
let info = RemoteInfo {
endpoint_id: self.endpoint_id,
addrs,
};
tx.send(info).ok();
}
RemoteStateMessage::NetworkChange { is_major } => {
self.handle_msg_network_change(is_major);
}
}
}
async fn handle_msg_send_datagram(
&mut self,
mut sender: Box<TransportsSender>,
transmit: OwnedTransmit,
) {
if let Some(addr) = self.selected_path.get() {
trace!(?addr, "sending datagram to selected path");
if let Err(err) = send_datagram(&mut sender, addr.clone(), transmit).await {
debug!(?addr, "failed to send datagram on selected_path: {err:#}");
}
} else {
trace!(
paths = ?self.paths.addrs().collect::<Vec<_>>(),
"sending datagram to all known paths",
);
if self.paths.is_empty() {
warn!("Cannot send datagrams: No paths to remote endpoint known");
}
for addr in self.paths.addrs() {
if let transports::Addr::Ip(sockaddr) = addr
&& self
.local_direct_addrs
.peek()
.iter()
.any(|a| a.addr == *sockaddr)
{
trace!(%sockaddr, "not sending datagram to our own address");
} else if let Err(err) =
send_datagram(&mut sender, addr.clone(), transmit.clone()).await
{
debug!(?addr, "failed to send datagram: {err:#}");
}
}
}
}
fn handle_msg_add_connection(
&mut self,
handle: WeakConnectionHandle,
tx: oneshot::Sender<PathWatchable>,
) {
let path_watchable = PathWatchable::new(self.selected_path.clone());
if let Some(conn) = handle.upgrade() {
self.metrics.num_conns_opened.inc();
let conn_id = ConnId(conn.stable_id());
self.connections.remove(&conn_id);
self.path_events
.push(Box::pin(conn.path_events().map(move |evt| (conn_id, evt))));
self.addr_events.push(Box::pin(
conn.nat_traversal_updates().map(move |evt| (conn_id, evt)),
));
self.connections_close.push(OnClosed::new(&conn));
let local_addrs = self
.local_direct_addrs
.get()
.iter()
.map(|d| d.addr)
.collect::<BTreeSet<_>>();
Self::update_qnt_candidates(&conn, &local_addrs);
let conn_state = self
.connections
.entry(conn_id)
.insert_entry(ConnectionState {
handle: handle.clone(),
path_watchable: path_watchable.clone(),
paths: Default::default(),
paths_by_addr: Default::default(),
has_been_direct: false,
})
.into_mut();
if let Some(path) = conn.path(PathId::ZERO)
&& let Ok(socketaddr) = path.remote_address()
&& let Some(path_remote) = to_transport_addr(
socketaddr,
&self.relay_mapped_addrs,
&self.custom_mapped_addrs,
)
{
trace!(?path_remote, "added new connection");
let bias = self.transport_bias.get(&path_remote);
let path_status = bias.transport_type.to_path_status();
let res = path.set_status(path_status);
event!(
target: "iroh::_events::path::set_status",
Level::DEBUG,
remote = %self.endpoint_id.fmt_short(),
?path_remote,
?path_status,
%conn_id,
path_id = %PathId::ZERO,
?res,
);
Self::configure_path(&path, &path_remote);
conn_state.add_open_path(path_remote.clone(), PathId::ZERO, &self.metrics);
self.paths
.insert_open_path(path_remote.clone(), Source::Connection { _0: Private });
self.select_path();
if path_remote.is_ip() {
let relays = self
.paths
.addrs()
.filter(|a| a.is_relay())
.cloned()
.collect::<Vec<_>>();
for remote in relays {
self.open_path(&remote);
}
}
}
self.trigger_holepunching();
}
tx.send(path_watchable).ok();
}
fn handle_msg_resolve_remote(
&mut self,
addrs: BTreeSet<TransportAddr>,
tx: oneshot::Sender<Result<(), AddressLookupFailed>>,
) {
let addrs = to_transports_addr(self.endpoint_id, addrs);
self.paths.insert_multiple(addrs, Source::App);
self.paths.resolve_remote(tx);
self.trigger_address_lookup();
}
fn handle_msg_network_change(&mut self, is_major: bool) {
for conn in self.connections.values() {
if let Some(noq_conn) = conn.handle.upgrade() {
for (path_id, addr) in &conn.paths {
if let Some(path) = noq_conn.path(*path_id) {
if let Err(err) = path.ping() {
warn!(%err, %path_id, ?addr, "failed to ping path");
}
}
}
}
}
if is_major {
self.trigger_holepunching();
}
}
fn handle_connection_close(&mut self, conn_id: ConnId, reason: ConnectionError) {
event!(
target: "iroh::_events::conn::closed",
Level::DEBUG,
%conn_id,
remote_id = %self.endpoint_id.fmt_short(),
?reason,
);
if self.connections.remove(&conn_id).is_some() {
self.metrics.num_conns_closed.inc();
}
if self.connections.is_empty() {
trace!("last connection closed - clearing selected_path");
self.selected_path.set(None).ok();
}
}
fn trigger_address_lookup(&mut self) {
if self.selected_path.get().is_some() || self.address_lookup_stream.is_some() {
return;
}
let stream = self.address_lookup.resolve(self.endpoint_id);
let stream = stream.filter_map(|item| match item {
Ok(Err(_err)) => None,
Ok(Ok(item)) => Some(Ok(item)),
Err(err) => Some(Err(err)),
});
self.address_lookup_stream = Some(Box::pin(stream));
}
fn handle_address_lookup_item(
&mut self,
item: Option<Result<AddressLookupItem, AddressLookupFailed>>,
) {
match item {
None => {
self.paths.address_lookup_finished(Ok(()));
self.address_lookup_stream = None;
}
Some(Err(err)) => {
warn!("Address Lookup failed: {err:#}");
self.paths.address_lookup_finished(Err(err));
self.address_lookup_stream = None;
}
Some(Ok(item)) => {
if item.endpoint_id() != self.endpoint_id {
warn!(
?item,
"Address Lookup emitted item for wrong remote endpoint"
);
} else {
let source = Source::AddressLookup {
name: item.provenance().to_string(),
};
let addrs =
to_transports_addr(self.endpoint_id, item.into_endpoint_addr().addrs);
self.paths.insert_multiple(addrs, source);
}
}
}
}
fn update_local_direct_address(&mut self) {
let local_addrs = self
.local_direct_addrs
.get()
.iter()
.map(|d| d.addr)
.collect::<BTreeSet<_>>();
for conn in self.connections.values().filter_map(|s| s.handle.upgrade()) {
Self::update_qnt_candidates(&conn, &local_addrs);
}
}
fn update_qnt_candidates(conn: &noq::Connection, direct_addrs: &BTreeSet<SocketAddr>) {
let noq_candidates = match conn.get_local_nat_traversal_addresses() {
Ok(addrs) => BTreeSet::from_iter(addrs),
Err(err) => {
warn!("failed to get local nat candidates: {err:#}");
return;
}
};
for addr in direct_addrs.difference(&noq_candidates) {
if let Err(err) = conn.add_nat_traversal_address(*addr) {
warn!("failed adding local addr: {err:#}",);
}
}
for addr in noq_candidates.difference(direct_addrs) {
if let Err(err) = conn.remove_nat_traversal_address(*addr) {
warn!("failed removing local addr: {err:#}");
}
}
trace!(?direct_addrs, "updated local QNT addresses");
}
fn trigger_holepunching(&mut self) {
if self.connections.is_empty() {
trace!("not holepunching: no connections");
return;
}
let Some(conn) = self
.connections
.iter()
.filter_map(|(id, state)| state.handle.upgrade().map(|conn| (*id, conn)))
.filter(|(_, conn)| conn.side().is_client())
.min_by_key(|(id, _)| *id)
.map(|(_, conn)| conn)
else {
trace!("not holepunching: no client connection");
return;
};
let remote_candidates = match conn.get_remote_nat_traversal_addresses() {
Ok(addrs) => BTreeSet::from_iter(addrs),
Err(err) => {
warn!("failed to get nat candidate addresses: {err:#}");
return;
}
};
let local_candidates: BTreeSet<SocketAddr> = self
.local_direct_addrs
.get()
.iter()
.map(|daddr| daddr.addr)
.collect();
let new_candidates = self
.last_holepunch
.as_ref()
.map(|last_hp| {
trace!(
?last_hp,
?local_candidates,
?remote_candidates,
"candidates to holepunch?"
);
!remote_candidates.is_subset(&last_hp.remote_candidates)
|| !local_candidates.is_subset(&last_hp.local_candidates)
})
.unwrap_or(true);
if !new_candidates && let Some(ref last_hp) = self.last_holepunch {
let next_hp = last_hp.when + HOLEPUNCH_ATTEMPTS_INTERVAL;
let now = Instant::now();
if next_hp > now {
trace!(scheduled_in = ?(next_hp - now), "not holepunching: no new addresses");
self.scheduled_holepunch = Some(next_hp);
return;
}
}
self.do_holepunching(conn);
}
#[instrument(skip_all)]
fn do_holepunching(&mut self, conn: noq::Connection) {
self.metrics.holepunch_attempts.inc();
let local_candidates = self
.local_direct_addrs
.get()
.iter()
.map(|daddr| daddr.addr)
.collect::<BTreeSet<_>>();
match conn.initiate_nat_traversal_round() {
Ok(remote_candidates) => {
let remote_candidates = remote_candidates
.iter()
.map(|addr| SocketAddr::new(addr.ip().to_canonical(), addr.port()))
.collect();
event!(
target: "iroh::_events::qnt::init",
Level::DEBUG,
remote = %self.endpoint_id.fmt_short(),
?local_candidates,
?remote_candidates,
);
self.last_holepunch = Some(HolepunchAttempt {
when: Instant::now(),
local_candidates,
remote_candidates,
});
}
Err(err) => {
debug!("failed to initiate NAT traversal: {err:#}");
use noq_proto::n0_nat_traversal::Error;
match err {
Error::Closed
| Error::TooManyAddresses
| Error::WrongConnectionSide
| Error::ExtensionNotNegotiated => {
}
Error::Multipath(_) | Error::NotEnoughAddresses => {
let now = Instant::now();
let next_hp = now + Duration::from_millis(100);
trace!(scheduled_in = ?(next_hp - now), "holepunching retry");
self.scheduled_holepunch = Some(next_hp);
}
}
}
}
}
fn configure_path(path: &noq::Path, addr: &transports::Addr) {
if matches!(addr, transports::Addr::Relay(..))
&& let Err(e) = path.set_max_idle_timeout(Some(RELAY_PATH_MAX_IDLE_TIMEOUT))
{
debug!(?e, "failed to set relay path idle timeout");
}
}
#[instrument(level = "warn", skip(self))]
fn open_path(&mut self, open_addr: &transports::Addr) {
let bias = self.transport_bias.get(open_addr);
let path_status = bias.transport_type.to_path_status();
let quic_addr = match &open_addr {
transports::Addr::Ip(socket_addr) => *socket_addr,
transports::Addr::Relay(relay_url, eid) => self
.relay_mapped_addrs
.get(&(relay_url.clone(), *eid))
.private_socket_addr(),
transports::Addr::Custom(addr) => {
self.custom_mapped_addrs.get(addr).private_socket_addr()
}
};
for (conn_id, conn_state) in self.connections.iter_mut() {
let Some(conn) = conn_state.handle.upgrade() else {
continue;
};
if let Some(&path_id) = conn_state.paths_by_addr.get(open_addr)
&& let Some(path) = conn.path(path_id)
{
let res = path.set_status(path_status);
event!(
target: "iroh::_events::path::set_status",
Level::DEBUG,
remote = %self.endpoint_id.fmt_short(),
?open_addr,
?path_status,
%conn_id,
%path_id,
?res,
);
Self::configure_path(&path, open_addr);
continue;
}
if conn.side().is_server() {
continue;
}
let fut = conn.open_path_ensure(quic_addr, path_status);
match fut.path_id() {
Some(path_id) => {
trace!(%conn_id, %path_id, ?path_status, "opening new path");
if let Some(path) = conn.path(path_id) {
let res = path.set_status(path_status);
event!(
target: "iroh::_events::path::set_status",
Level::DEBUG,
remote = %self.endpoint_id.fmt_short(),
?open_addr,
?path_status,
%conn_id,
%path_id,
?res,
);
if let Err(e) = res {
warn!(?e, ?open_addr, ?path_status, "Setting path status failed");
}
Self::configure_path(&path, open_addr);
}
}
None => {
let ret = now_or_never(fut);
match ret {
Some(Err(PathError::RemoteCidsExhausted)) => {
self.scheduled_open_path =
Some(Instant::now() + Duration::from_millis(333));
self.pending_open_paths.push_back(open_addr.clone());
trace!(?open_addr, "scheduling open_path");
}
_ => warn!(?ret, "Opening path failed"),
}
}
}
}
}
#[instrument(skip(self))]
fn handle_path_event(&mut self, conn_id: ConnId, event: Result<PathEvent, noq::Lagged>) {
let Ok(event) = event else {
warn!("missed a PathEvent, RemoteStateActor lagging");
return;
};
let Some(conn_state) = self.connections.get_mut(&conn_id) else {
trace!("event for removed connection");
return;
};
let Some(conn) = conn_state.handle.upgrade() else {
trace!("event for closed connection");
return;
};
trace!("path event");
match event {
PathEvent::Opened { id: path_id } => {
let Some(path) = conn.path(path_id) else {
trace!("path open event for unknown path");
return;
};
if let Ok(socketaddr) = path.remote_address()
&& let Some(path_remote) = to_transport_addr(
socketaddr,
&self.relay_mapped_addrs,
&self.custom_mapped_addrs,
)
{
event!(
target: "iroh::_events::path::open",
Level::DEBUG,
remote = %self.endpoint_id.fmt_short(),
?path_remote,
%conn_id,
%path_id,
);
Self::configure_path(&path, &path_remote);
conn_state.add_open_path(path_remote.clone(), path_id, &self.metrics);
self.paths
.insert_open_path(path_remote.clone(), Source::Connection { _0: Private });
}
self.select_path();
}
PathEvent::Abandoned { id, .. } => {
let Some(path_remote) = conn_state.remove_path(&id) else {
debug!(%id, "path not in path_id_map");
return;
};
self.paths.abandoned_path(&path_remote);
event!(
target: "iroh::_events::path::abandoned",
Level::DEBUG,
remote = %self.endpoint_id.fmt_short(),
?path_remote,
%conn_id,
path_id = ?id,
);
for (conn_id, conn_state) in self.connections.iter_mut() {
let Some(path_id) = conn_state.paths_by_addr.get(&path_remote) else {
continue;
};
let Some(conn) = conn_state.handle.upgrade() else {
continue;
};
if let Some(path) = conn.path(*path_id) {
trace!(?path_remote, %conn_id, %path_id, "closing path");
if let Err(err) = path.close() {
trace!(
?path_remote,
%conn_id,
%path_id,
"path close failed: {err:#}"
);
}
}
}
self.select_path();
}
PathEvent::Discarded { id, path_stats } => {
trace!(%id, ?path_stats, "path discarded");
}
PathEvent::RemoteStatus { .. } | PathEvent::ObservedAddr { .. } => {
}
}
}
#[instrument(skip_all)]
fn select_path(&mut self) {
let mut all_path_rtts: FxHashMap<transports::Addr, Vec<Duration>> = FxHashMap::default();
for conn_state in self.connections.values() {
let Some(conn) = conn_state.handle.upgrade() else {
continue;
};
for (path_id, addr) in conn_state.paths.iter() {
if let Some(stats) = conn.path_stats(*path_id) {
all_path_rtts
.entry(addr.clone())
.or_default()
.push(stats.rtt);
}
}
}
trace!(?all_path_rtts, "dumping all path RTTs");
let path_rtts: FxHashMap<transports::Addr, PathSelectionData> = all_path_rtts
.into_iter()
.filter_map(|(addr, rtts)| rtts.into_iter().min().map(|rtt| (addr, rtt)))
.map(|(addr, rtt)| {
(
addr.clone(),
self.transport_bias.path_selection_data(&addr, rtt),
)
})
.collect();
let current_path = self.selected_path.get();
let selected_path = select_best_path(path_rtts, ¤t_path);
if let Some((addr, rtt)) = selected_path {
let prev = self.selected_path.set(Some(addr.clone()));
if prev.is_ok() {
event!(
target: "iroh::_events::path::selected",
Level::DEBUG,
remote = %self.endpoint_id.fmt_short(),
path_remote = ?addr,
?rtt,
prev_remote = ?prev,
);
}
self.open_path(&addr);
self.close_redundant_paths(&addr);
} else {
trace!(?current_path, "keeping current path");
}
}
fn close_redundant_paths(&mut self, selected_path: &transports::Addr) {
debug_assert_eq!(self.selected_path.get().as_ref(), Some(selected_path));
for (conn_id, conn_state) in self.connections.iter() {
for (path_id, path_remote) in conn_state
.paths
.iter()
.filter(|(_, addr)| !addr.is_relay())
.filter(|(_, addr)| *addr != selected_path)
{
if conn_state.paths.values().filter(|a| a.is_ip()).count() <= 1 {
continue; }
if let Some(path) = conn_state
.handle
.upgrade()
.filter(|conn| conn.side().is_client())
.and_then(|conn| conn.path(*path_id))
{
trace!(?path_remote, %conn_id, %path_id, "closing direct path");
match path.close() {
Err(noq_proto::ClosePathError::MultipathNotNegotiated) => {
error!("multipath not negotiated");
}
Err(noq_proto::ClosePathError::LastOpenPath) => {
error!("could not close last open path");
}
Err(noq_proto::ClosePathError::ClosedPath) => {
}
Ok(_fut) => {
}
}
}
}
}
}
fn check_connections(&mut self) {
let mut is_goodenough = true;
for conn_state in self.connections.values() {
let mut is_conn_goodenough = false;
if let Some(conn) = conn_state.handle.upgrade() {
let min_ip_rtt = conn_state
.paths
.iter()
.filter_map(|(path_id, addr)| {
if addr.is_ip() {
conn.path_stats(*path_id).map(|stats| stats.rtt)
} else {
None
}
})
.min();
if let Some(min_ip_rtt) = min_ip_rtt {
let is_latency_goodenough = min_ip_rtt <= GOOD_ENOUGH_LATENCY;
is_conn_goodenough = is_latency_goodenough;
} else {
is_conn_goodenough = false;
}
}
is_goodenough &= is_conn_goodenough;
}
if !is_goodenough {
debug!("connections are not good enough, triggering holepunching");
self.trigger_holepunching();
}
}
}
fn select_best_path(
all_paths: FxHashMap<transports::Addr, PathSelectionData>,
current_path: &Option<transports::Addr>,
) -> Option<(transports::Addr, Duration)> {
let (best_addr, best_data) = all_paths.iter().min_by_key(|(_, psd)| psd.sort_key())?;
let Some(addr) = current_path else {
return Some((best_addr.clone(), best_data.rtt));
};
let Some(current_data) = all_paths.get(addr) else {
return Some((best_addr.clone(), best_data.rtt));
};
if current_data.transport_type != best_data.transport_type {
Some((best_addr.clone(), best_data.rtt))
} else if best_data.biased_rtt + RTT_SWITCHING_MIN_IP.as_nanos() as i128
<= current_data.biased_rtt
{
Some((best_addr.clone(), best_data.rtt))
} else {
None
}
}
fn send_datagram<'a>(
sender: &'a mut TransportsSender,
dst: transports::Addr,
owned_transmit: OwnedTransmit,
) -> impl Future<Output = n0_error::Result<()>> + 'a {
std::future::poll_fn(move |cx| {
let transmit = transports::Transmit {
ecn: owned_transmit.ecn,
contents: owned_transmit.contents.as_ref(),
segment_size: owned_transmit.segment_size,
};
Pin::new(&mut *sender)
.poll_send(cx, &dst, None, &transmit)
.map(|res| res.with_context(|_| format!("failed to send datagram to {dst:?}")))
})
}
#[derive(derive_more::Debug)]
pub(crate) enum RemoteStateMessage {
#[debug("SendDatagram(..)")]
SendDatagram(Box<TransportsSender>, OwnedTransmit),
#[debug("AddConnection(..)")]
AddConnection(WeakConnectionHandle, oneshot::Sender<PathWatchable>),
#[debug("ResolveRemote(..)")]
ResolveRemote(
BTreeSet<TransportAddr>,
oneshot::Sender<Result<(), AddressLookupFailed>>,
),
RemoteInfo(oneshot::Sender<RemoteInfo>),
NetworkChange { is_major: bool },
}
#[derive(Debug)]
struct HolepunchAttempt {
when: Instant,
local_candidates: BTreeSet<SocketAddr>,
remote_candidates: BTreeSet<SocketAddr>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, derive_more::Display)]
#[display("{_0}")]
struct ConnId(usize);
#[derive(Debug)]
struct ConnectionState {
handle: WeakConnectionHandle,
path_watchable: PathWatchable,
paths: FxHashMap<PathId, transports::Addr>,
paths_by_addr: FxHashMap<transports::Addr, PathId>,
has_been_direct: bool,
}
impl Drop for ConnectionState {
fn drop(&mut self) {
self.path_watchable.close();
}
}
impl ConnectionState {
fn add_open_path(
&mut self,
remote: transports::Addr,
path_id: PathId,
metrics: &Arc<SocketMetrics>,
) {
match remote {
transports::Addr::Ip(_) => metrics.paths_direct.inc(),
transports::Addr::Relay(_, _) => metrics.paths_relay.inc(),
transports::Addr::Custom(_) => metrics.paths_custom.inc(),
};
if !self.has_been_direct && remote.is_ip() {
self.has_been_direct = true;
metrics.num_conns_direct.inc();
}
self.paths.insert(path_id, remote.clone());
self.paths_by_addr.insert(remote.clone(), path_id);
if let Some(conn) = self.handle.upgrade() {
self.path_watchable.insert(&conn, path_id, remote.into());
}
}
fn remove_path(&mut self, path_id: &PathId) -> Option<transports::Addr> {
let addr = self.paths.remove(path_id);
if let Some(ref addr) = addr {
self.paths_by_addr.remove(addr);
}
self.path_watchable.set_abandoned(*path_id);
addr
}
}
fn now_or_never<T, F: Future<Output = T>>(fut: F) -> Option<T> {
let fut = std::pin::pin!(fut);
match fut.poll(&mut std::task::Context::from_waker(std::task::Waker::noop())) {
Poll::Ready(res) => Some(res),
Poll::Pending => None,
}
}
struct OnClosed {
conn_id: ConnId,
inner: noq::OnClosed,
}
impl OnClosed {
fn new(conn: &noq::Connection) -> Self {
Self {
conn_id: ConnId(conn.stable_id()),
inner: conn.on_closed(),
}
}
}
impl Future for OnClosed {
type Output = (ConnId, ConnectionError);
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let (close_reason, _stats) = std::task::ready!(Pin::new(&mut self.inner).poll(cx));
Poll::Ready((self.conn_id, close_reason))
}
}
fn to_transports_addr(
endpoint_id: EndpointId,
addrs: impl IntoIterator<Item = TransportAddr>,
) -> impl Iterator<Item = transports::Addr> {
addrs.into_iter().filter_map(move |addr| match addr {
TransportAddr::Relay(relay_url) => Some(transports::Addr::from((relay_url, endpoint_id))),
TransportAddr::Ip(sockaddr) => Some(transports::Addr::from(sockaddr)),
TransportAddr::Custom(custom_addr) => Some(transports::Addr::from(custom_addr)),
_ => {
warn!(?addr, "Unsupported TransportAddr");
None
}
})
}
async fn maybe_next<S: Stream + Unpin>(maybe_stream: Option<&mut S>) -> Option<Option<S::Item>> {
match maybe_stream {
None => None,
Some(s) => Some(s.next().await),
}
}
#[cfg(test)]
mod tests {
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use super::*;
use crate::socket::transports::TransportType;
fn v4(port: u16) -> transports::Addr {
transports::Addr::Ip(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)))
}
fn v6(port: u16) -> transports::Addr {
transports::Addr::Ip(SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::LOCALHOST,
port,
0,
0,
)))
}
fn relay(port: u16) -> transports::Addr {
let url = format!("https://relay{port}.iroh.computer")
.parse::<RelayUrl>()
.unwrap();
transports::Addr::Relay(url, EndpointId::from_bytes(&[0u8; 32]).unwrap())
}
fn psd(transport_type: TransportType, rtt_ms: u64) -> PathSelectionData {
let rtt = Duration::from_millis(rtt_ms);
let biased_rtt = rtt.as_nanos() as i128;
PathSelectionData {
transport_type,
rtt,
biased_rtt,
}
}
fn psd_v6(transport_type: TransportType, rtt_ms: u64) -> PathSelectionData {
let rtt = Duration::from_millis(rtt_ms);
let biased_rtt = rtt.as_nanos() as i128 - transports::IPV6_RTT_ADVANTAGE.as_nanos() as i128;
PathSelectionData {
transport_type,
rtt,
biased_rtt,
}
}
#[test]
fn test_ipv6_wins_over_ipv4_within_bias() {
let mut paths = FxHashMap::default();
paths.insert(v4(1), psd(TransportType::Primary, 10));
paths.insert(v6(1), psd_v6(TransportType::Primary, 10));
let result = select_best_path(paths, &None);
assert!(result.is_some());
let (addr, _) = result.unwrap();
assert!(matches!(addr, transports::Addr::Ip(SocketAddr::V6(_))));
let mut paths = FxHashMap::default();
paths.insert(v4(1), psd(TransportType::Primary, 10));
paths.insert(v6(1), psd_v6(TransportType::Primary, 12));
let result = select_best_path(paths, &None);
assert!(result.is_some());
let (addr, _) = result.unwrap();
assert!(matches!(addr, transports::Addr::Ip(SocketAddr::V6(_))));
let mut paths = FxHashMap::default();
paths.insert(v4(1), psd(TransportType::Primary, 10));
paths.insert(v6(1), psd_v6(TransportType::Primary, 20));
let result = select_best_path(paths, &None);
assert!(result.is_some());
let (addr, _) = result.unwrap();
assert!(matches!(addr, transports::Addr::Ip(SocketAddr::V4(_))));
}
#[test]
fn test_available_wins_over_backup_regardless_of_rtt() {
let mut paths = FxHashMap::default();
paths.insert(v4(1), psd(TransportType::Primary, 100)); paths.insert(relay(1), psd(TransportType::Backup, 10));
let result = select_best_path(paths, &None);
assert!(result.is_some());
let (addr, _) = result.unwrap();
assert!(addr.is_ip());
let mut paths = FxHashMap::default();
paths.insert(v4(1), psd(TransportType::Primary, 1000));
paths.insert(relay(1), psd(TransportType::Backup, 1));
let result = select_best_path(paths, &None);
assert!(result.is_some());
let (addr, _) = result.unwrap();
assert!(addr.is_ip());
}
#[test]
fn test_same_category_only_switches_with_significant_rtt_diff() {
let current = v4(1);
let mut paths = FxHashMap::default();
paths.insert(v4(1), psd(TransportType::Primary, 20));
paths.insert(v4(2), psd(TransportType::Primary, 18));
let result = select_best_path(paths, &Some(current.clone()));
assert!(result.is_none());
let mut paths = FxHashMap::default();
paths.insert(v4(1), psd(TransportType::Primary, 20));
paths.insert(v4(2), psd(TransportType::Primary, 16));
let result = select_best_path(paths, &Some(current.clone()));
assert!(result.is_none());
let mut paths = FxHashMap::default();
paths.insert(v4(1), psd(TransportType::Primary, 20));
paths.insert(v4(2), psd(TransportType::Primary, 15));
let result = select_best_path(paths, &Some(current.clone()));
assert!(result.is_some());
let (addr, _) = result.unwrap();
assert_eq!(addr, v4(2));
let mut paths = FxHashMap::default();
paths.insert(v4(1), psd(TransportType::Primary, 20));
paths.insert(v4(2), psd(TransportType::Primary, 14));
let result = select_best_path(paths, &Some(current.clone()));
assert!(result.is_some());
let (addr, _) = result.unwrap();
assert_eq!(addr, v4(2));
}
#[test]
fn test_no_current_path_selects_best() {
let mut paths = FxHashMap::default();
paths.insert(v4(1), psd(TransportType::Primary, 20));
paths.insert(v4(2), psd(TransportType::Primary, 10));
let result = select_best_path(paths, &None);
assert!(result.is_some());
let (addr, _) = result.unwrap();
assert_eq!(addr, v4(2)); }
#[test]
fn test_empty_paths_returns_none() {
let paths: FxHashMap<transports::Addr, PathSelectionData> = FxHashMap::default();
let result = select_best_path(paths, &None);
assert!(result.is_none());
let paths: FxHashMap<transports::Addr, PathSelectionData> = FxHashMap::default();
let result = select_best_path(paths, &Some(v4(1)));
assert!(result.is_none());
}
}