use std::{
collections::{BTreeSet, VecDeque},
net::SocketAddr,
pin::Pin,
sync::Arc,
task::Poll,
};
use iroh_base::{CustomAddr, EndpointId, RelayUrl, TransportAddr};
use n0_error::StackResultExt;
use n0_future::{
FuturesUnordered, MaybeFuture, MergeUnbounded, Stream, StreamExt,
boxed::BoxStream,
task::JoinSet,
time::{self, Duration, Instant},
};
use n0_watcher::Watcher;
use noq::{Closed, PathStats, PathStatus, WeakConnectionHandle};
use noq_proto::{PathError, PathEvent as NoqPathEvent, PathId, n0_nat_traversal};
use rustc_hash::FxHashMap;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, Level, Span, debug, error, event, info_span, instrument, trace, warn};
use self::path_state::RemotePathState;
pub(crate) use self::path_watcher::PathStateReceiver;
pub use self::{
path_watcher::{Path, PathEvent, PathEventStream, PathList, PathListIter, PathListStream},
remote_info::{RemoteInfo, TransportAddrInfo, TransportAddrUsage},
};
use super::Source;
use crate::{
address_lookup::{AddressLookupFailed, AddressLookupServices, Item as AddressLookupItem},
endpoint::DirectAddr,
socket::{
Metrics as SocketMetrics, RELAY_PATH_MAX_IDLE_TIMEOUT,
mapped_addrs::{AddrMap, CustomMappedAddr, RelayMappedAddr},
remote_map::remote_state::path_watcher::PathStateSender,
transports::{self, OwnedTransmit, TransportsSender},
},
};
mod path_state;
mod path_watcher;
mod remote_info;
const HOLEPUNCH_ATTEMPTS_INTERVAL: Duration = Duration::from_secs(5);
const GOOD_ENOUGH_LATENCY: Duration = Duration::from_millis(10);
const UPGRADE_INTERVAL: Duration = Duration::from_secs(60);
const ACTOR_MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
type PathEvents = MergeUnbounded<
Pin<Box<dyn Stream<Item = (ConnId, Result<NoqPathEvent, noq::Lagged>)> + Send + Sync>>,
>;
type AddrEvents = MergeUnbounded<
Pin<
Box<
dyn Stream<Item = (ConnId, Result<n0_nat_traversal::Event, noq::Lagged>)> + Send + Sync,
>,
>,
>;
pub(super) struct RemoteStateActor {
connections: FxHashMap<ConnId, ConnectionState>,
state: State,
}
struct State {
endpoint_id: EndpointId,
metrics: Arc<SocketMetrics>,
local_direct_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>,
custom_mapped_addrs: AddrMap<CustomAddr, CustomMappedAddr>,
address_lookup: AddressLookupServices,
connections_close: FuturesUnordered<OnClosed>,
path_events: PathEvents,
addr_events: AddrEvents,
paths: RemotePathState,
last_holepunch: Option<HolepunchAttempt>,
selected_path: Option<transports::FourTuple>,
scheduled_holepunch: Option<Instant>,
scheduled_open_path: Option<Instant>,
pending_open_paths: VecDeque<transports::FourTuple>,
address_lookup_stream: Option<BoxStream<Result<AddressLookupItem, AddressLookupFailed>>>,
path_selector: Arc<dyn PathSelector>,
}
impl RemoteStateActor {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
endpoint_id: EndpointId,
local_direct_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>,
custom_mapped_addrs: AddrMap<CustomAddr, CustomMappedAddr>,
metrics: Arc<SocketMetrics>,
address_lookup: AddressLookupServices,
path_selector: Arc<dyn PathSelector>,
) -> Self {
Self {
connections: FxHashMap::default(),
state: State {
endpoint_id,
metrics: metrics.clone(),
local_direct_addrs,
relay_mapped_addrs,
custom_mapped_addrs,
address_lookup,
connections_close: Default::default(),
path_events: Default::default(),
addr_events: Default::default(),
paths: RemotePathState::new(metrics),
last_holepunch: None,
selected_path: Default::default(),
scheduled_holepunch: None,
scheduled_open_path: None,
pending_open_paths: VecDeque::new(),
address_lookup_stream: None,
path_selector,
},
}
}
pub(super) fn start(
self,
initial_msgs: Vec<RemoteStateMessage>,
tasks: &mut JoinSet<(EndpointId, Vec<RemoteStateMessage>)>,
shutdown_token: CancellationToken,
parent_span: Span,
) -> mpsc::Sender<RemoteStateMessage> {
let (tx, rx) = mpsc::channel(16);
let endpoint_id = self.state.endpoint_id;
tasks.spawn(
self.run(initial_msgs, rx, shutdown_token)
.instrument(info_span!(
parent: parent_span,
"RemoteStateActor",
remote = %endpoint_id.fmt_short(),
)),
);
tx
}
async fn run(
mut self,
initial_msgs: Vec<RemoteStateMessage>,
mut inbox: mpsc::Receiver<RemoteStateMessage>,
shutdown_token: CancellationToken,
) -> (EndpointId, Vec<RemoteStateMessage>) {
trace!("actor started");
for msg in initial_msgs {
self.handle_message(msg).await;
}
let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT);
n0_future::pin!(idle_timeout);
let check_connections = time::interval(UPGRADE_INTERVAL);
n0_future::pin!(check_connections);
loop {
let scheduled_path_open = match self.state.scheduled_open_path {
Some(when) => MaybeFuture::Some(time::sleep_until(when)),
None => MaybeFuture::None,
};
n0_future::pin!(scheduled_path_open);
let scheduled_hp = match self.state.scheduled_holepunch {
Some(when) => MaybeFuture::Some(time::sleep_until(when)),
None => MaybeFuture::None,
};
n0_future::pin!(scheduled_hp);
if !self.is_idle(&inbox) {
idle_timeout
.as_mut()
.reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT);
}
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
trace!("actor cancelled");
break;
}
msg = inbox.recv() => {
match msg {
Some(msg) => self.handle_message(msg).await,
None => break,
}
}
Some((id, evt)) = self.state.path_events.next() => {
self.handle_path_event(id, evt);
}
Some((id, evt)) = self.state.addr_events.next() => {
trace!(?id, ?evt, "remote addrs updated, triggering holepunching");
self.trigger_holepunching();
}
Some((conn_id, closed)) = self.state.connections_close.next(), if !self.state.connections_close.is_empty() => {
self.handle_connection_close(conn_id, closed);
}
res = self.state.local_direct_addrs.updated() => {
if let Err(n0_watcher::Disconnected) = res {
trace!("direct address watcher disconnected, shutting down");
break;
}
self.update_local_direct_address();
trace!("local addrs updated, triggering holepunching");
self.trigger_holepunching();
}
_ = &mut scheduled_path_open => {
trace!("triggering scheduled path_open");
self.state.scheduled_open_path = None;
let mut addrs = std::mem::take(&mut self.state.pending_open_paths);
while let Some(addr) = addrs.pop_front() {
self.open_path_on_all_conns(&addr);
}
}
_ = &mut scheduled_hp => {
trace!("triggering scheduled holepunching");
self.state.scheduled_holepunch = None;
self.trigger_holepunching();
}
Some(item) = maybe_next(self.state.address_lookup_stream.as_mut()), if self.state.address_lookup_stream.is_some() => {
self.state.handle_address_lookup_item(item);
}
_ = check_connections.tick() => {
self.check_connections();
}
_ = &mut idle_timeout => {
if self.is_idle(&inbox) {
trace!("idle timeout expired and still idle: terminate actor");
break;
} else {
idle_timeout.as_mut().reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT);
}
}
}
}
inbox.close();
let mut leftover_msgs = Vec::with_capacity(inbox.len());
inbox.recv_many(&mut leftover_msgs, inbox.len()).await;
trace!("actor terminating");
(self.state.endpoint_id, leftover_msgs)
}
fn is_idle(&self, inbox: &mpsc::Receiver<RemoteStateMessage>) -> bool {
self.connections.is_empty()
&& inbox.is_empty()
&& self.state.paths.resolve_requests_is_empty()
}
#[instrument(skip(self))]
async fn handle_message(&mut self, msg: RemoteStateMessage) {
match msg {
RemoteStateMessage::SendDatagram(sender, transmit) => {
self.state.handle_msg_send_datagram(sender, transmit).await;
}
RemoteStateMessage::AddConnection(handle, tx) => {
self.handle_msg_add_connection(handle, tx);
}
RemoteStateMessage::ResolveRemote(addrs, tx) => {
self.state.handle_msg_resolve_remote(addrs, tx);
}
RemoteStateMessage::RemoteInfo(tx) => {
let addrs = self.state.paths.to_remote_addrs();
let info = RemoteInfo {
endpoint_id: self.state.endpoint_id,
addrs,
};
tx.send(info).ok();
}
RemoteStateMessage::NetworkChange { is_major } => {
self.handle_msg_network_change(is_major);
}
}
}
fn handle_msg_add_connection(
&mut self,
conn: noq::Connection,
tx: oneshot::Sender<PathStateReceiver>,
) {
let (path_state_sender, path_state_receiver) = PathStateSender::new();
self.state.metrics.num_conns_opened.inc();
let conn_id = ConnId(conn.stable_id());
self.connections.remove(&conn_id);
self.state
.path_events
.push(Box::pin(conn.path_events().map(move |evt| (conn_id, evt))));
self.state.addr_events.push(Box::pin(
conn.nat_traversal_updates().map(move |evt| (conn_id, evt)),
));
self.state.connections_close.push(OnClosed::new(&conn));
let local_addrs = self.state.local_candidates();
update_qnt_candidates(&conn, &local_addrs);
let conn_state = self
.connections
.entry(conn_id)
.insert_entry(ConnectionState {
handle: conn.weak_handle(),
path_state: path_state_sender,
paths: Default::default(),
has_been_direct: false,
})
.into_mut();
if let Some(path) = conn.path(PathId::ZERO) {
let path_remote = self
.state
.register_and_configure_path(conn_id, conn_state, &path);
if let Some(path_remote) = path_remote
&& !path_remote.is_relay()
&& conn.side().is_client()
{
let relays = self
.state
.paths
.addrs()
.filter(|addr| addr.is_relay())
.map(|addr| transports::FourTuple::from_remote(addr.clone()))
.collect::<Vec<_>>();
for open_addr in relays {
self.state
.open_path_on_conn(conn_id, conn_state, &conn, &open_addr);
}
}
}
self.trigger_holepunching();
self.select_path();
tx.send(path_state_receiver).ok();
}
fn handle_msg_network_change(&mut self, is_major: bool) {
for conn in self.connections.values() {
if let Some(noq_conn) = conn.handle.upgrade() {
for (path_id, addr) in &conn.paths {
if let Some(path) = noq_conn.path(*path_id) {
if let Err(err) = path.ping() {
warn!(%err, %path_id, ?addr, "failed to ping path");
}
}
}
}
}
if is_major {
self.trigger_holepunching();
}
}
fn handle_connection_close(&mut self, conn_id: ConnId, closed: Closed) {
event!(
target: "iroh::_events::conn::closed",
Level::DEBUG,
%conn_id,
remote_id = %self.state.endpoint_id.fmt_short(),
reason=?closed.reason,
);
if let Some(conn_state) = self.connections.remove(&conn_id) {
self.state.metrics.num_conns_closed.inc();
conn_state.path_state.close(closed);
}
if self.connections.is_empty() {
trace!("last connection closed - clearing selected_path");
self.state.selected_path = None;
}
}
fn update_local_direct_address(&mut self) {
let local_addrs = self.state.local_candidates();
for conn in self.connections.values().filter_map(|s| s.handle.upgrade()) {
update_qnt_candidates(&conn, &local_addrs);
}
}
fn trigger_holepunching(&mut self) {
if self.connections.is_empty() {
trace!("not holepunching: no connections");
return;
}
let Some(conn) = self
.connections
.iter()
.filter_map(|(id, state)| state.handle.upgrade().map(|conn| (*id, conn)))
.filter(|(_, conn)| conn.side().is_client())
.min_by_key(|(id, _)| *id)
.map(|(_, conn)| conn)
else {
trace!("not holepunching: no client connection");
return;
};
let remote_candidates = match conn.get_remote_nat_traversal_addresses() {
Ok(addrs) => BTreeSet::from_iter(addrs),
Err(err) => {
warn!("failed to get nat candidate addresses: {err:#}");
return;
}
};
let local_candidates = self.state.local_candidates();
let new_candidates = self
.state
.last_holepunch
.as_ref()
.map(|last_hp| {
trace!(
?last_hp,
?local_candidates,
?remote_candidates,
"candidates to holepunch?"
);
!remote_candidates.is_subset(&last_hp.remote_candidates)
|| !local_candidates.is_subset(&last_hp.local_candidates)
})
.unwrap_or(true);
if !new_candidates && let Some(ref last_hp) = self.state.last_holepunch {
let next_hp = last_hp.when + HOLEPUNCH_ATTEMPTS_INTERVAL;
let now = Instant::now();
if next_hp > now {
trace!(scheduled_in = ?(next_hp - now), "not holepunching: no new addresses");
self.state.scheduled_holepunch = Some(next_hp);
return;
}
}
self.state.do_holepunching(conn);
}
#[instrument(skip(self))]
fn handle_path_event(&mut self, conn_id: ConnId, event: Result<NoqPathEvent, noq::Lagged>) {
let Ok(event) = event else {
warn!("missed a PathEvent, RemoteStateActor lagging");
return;
};
let Some(conn_state) = self.connections.get_mut(&conn_id) else {
trace!("event for removed connection");
return;
};
let Some(conn) = conn_state.handle.upgrade() else {
trace!("event for closed connection");
return;
};
trace!("path event");
match event {
NoqPathEvent::Established { id: path_id, .. } => {
let Some(path) = conn.path(path_id) else {
trace!("path open event for unknown path");
return;
};
self.state
.register_and_configure_path(conn_id, conn_state, &path);
self.select_path();
}
NoqPathEvent::Abandoned { id, reason, .. } => {
let Some(network_path) = conn_state.remove_path(&id, &conn) else {
debug!(%id, "path not in path_id_map");
return;
};
if !conn_state
.paths
.values()
.any(|tuple| tuple.remote() == network_path.remote())
{
self.state.paths.abandoned_path(&network_path.remote());
}
event!(
target: "iroh::_events::path::abandoned",
Level::DEBUG,
remote = %self.state.endpoint_id.fmt_short(),
%conn_id,
path_id = %id,
%network_path,
?reason
);
self.select_path();
}
NoqPathEvent::Discarded { id, path_stats, .. } => {
trace!(%id, ?path_stats, "path discarded");
}
NoqPathEvent::RemoteStatus { .. } | NoqPathEvent::ObservedAddr { .. } => {
}
_ => {
#[cfg(test)]
panic!("Unhandled path event: {event:?}");
}
}
}
#[instrument(skip_all)]
fn select_path(&mut self) {
let current_path = self.state.selected_path.as_ref();
let selected_addr = {
let ctx = PathSelectionContext::new(current_path, &self.connections);
self.state.path_selector.select(&ctx).selected().cloned()
};
if let Some(addr) = selected_addr
&& self.state.selected_path.as_ref() != Some(&addr)
{
let prev_remote = self.state.selected_path.replace(addr.clone());
event!(
target: "iroh::_events::path::selected",
Level::DEBUG,
remote = %self.state.endpoint_id.fmt_short(),
network_path = %addr,
prev_network_path = %prev_remote.map(|p| format!("{p}")).unwrap_or("None".to_string()),
);
} else {
trace!(?current_path, "keeping current path");
}
self.apply_selected_path();
}
fn apply_selected_path(&mut self) {
let Some(selected) = self.state.selected_path.clone() else {
return;
};
for (conn_id, conn_state) in self.connections.iter() {
let Some(conn) = conn_state.handle.upgrade() else {
continue;
};
self.state
.open_path_on_conn(*conn_id, conn_state, &conn, &selected);
for (path_id, path_remote) in conn_state.paths.iter() {
let Some(path) = conn.path(*path_id) else {
continue;
};
if conn.side().is_client()
&& path_remote.is_ip()
&& path_remote != &selected
&& conn_state.paths.values().filter(|a| a.is_ip()).count() > 1
{
trace!(?path_remote, %conn_id, %path_id, "closing direct path");
match path.close() {
Err(noq_proto::ClosePathError::MultipathNotNegotiated) => {
error!("multipath not negotiated");
}
Err(noq_proto::ClosePathError::LastOpenPath) => {
error!("could not close last open path");
}
Err(noq_proto::ClosePathError::ClosedPath) => {
}
Ok(()) => {}
}
continue;
}
self.state.set_path_status(*conn_id, &path, path_remote);
}
conn_state.path_state.record_selected(&selected);
}
}
fn open_path_on_all_conns(&mut self, open_addr: &transports::FourTuple) {
for (conn_id, conn_state) in self.connections.iter() {
let Some(conn) = conn_state.handle.upgrade() else {
continue;
};
self.state
.open_path_on_conn(*conn_id, conn_state, &conn, open_addr);
}
}
fn check_connections(&mut self) {
let mut is_goodenough = true;
for conn_state in self.connections.values() {
let mut is_conn_goodenough = false;
if let Some(conn) = conn_state.handle.upgrade() {
let min_ip_rtt = conn_state
.paths
.iter()
.filter_map(|(path_id, addr)| {
if addr.is_ip() {
conn.path_stats(*path_id).map(|stats| stats.rtt)
} else {
None
}
})
.min();
if let Some(min_ip_rtt) = min_ip_rtt {
let is_latency_goodenough = min_ip_rtt <= GOOD_ENOUGH_LATENCY;
is_conn_goodenough = is_latency_goodenough;
} else {
is_conn_goodenough = false;
}
}
is_goodenough &= is_conn_goodenough;
}
if !is_goodenough {
debug!("connections are not good enough, triggering holepunching");
self.trigger_holepunching();
}
}
}
impl State {
async fn handle_msg_send_datagram(
&mut self,
mut sender: Box<TransportsSender>,
transmit: OwnedTransmit,
) {
if let Some(addr) = self.selected_path.as_ref() {
trace!(?addr, "sending datagram to selected path");
let four_tuple = transports::FourTuple::from_remote(addr.remote());
if let Err(err) = send_datagram(&mut sender, four_tuple, transmit).await {
debug!(?addr, "failed to send datagram on selected_path: {err:#}");
}
} else {
trace!(
paths = ?self.paths.addrs().collect::<Vec<_>>(),
"sending datagram to all known paths",
);
if self.paths.is_empty() {
warn!("Cannot send datagrams: No paths to remote endpoint known");
}
for addr in self.paths.addrs() {
if let transports::Addr::Ip(sockaddr) = addr
&& self
.local_direct_addrs
.peek()
.iter()
.any(|a| a.addr == *sockaddr)
{
trace!(%sockaddr, "not sending datagram to our own address");
} else if let Err(err) = send_datagram(
&mut sender,
transports::FourTuple::from_remote(addr.clone()),
transmit.clone(),
)
.await
{
debug!(?addr, "failed to send datagram: {err:#}");
}
}
}
}
fn handle_msg_resolve_remote(
&mut self,
addrs: BTreeSet<TransportAddr>,
tx: oneshot::Sender<Result<(), AddressLookupFailed>>,
) {
let addrs = to_transports_addr(self.endpoint_id, addrs);
self.paths.insert_multiple(addrs, Source::App);
self.paths.resolve_remote(tx);
self.trigger_address_lookup();
}
fn trigger_address_lookup(&mut self) {
if self.selected_path.is_some() || self.address_lookup_stream.is_some() {
return;
}
let stream = self.address_lookup.resolve(self.endpoint_id);
let stream = stream.filter_map(|item| match item {
Ok(Err(_err)) => None,
Ok(Ok(item)) => Some(Ok(item)),
Err(err) => Some(Err(err)),
});
self.address_lookup_stream = Some(Box::pin(stream));
}
fn handle_address_lookup_item(
&mut self,
item: Option<Result<AddressLookupItem, AddressLookupFailed>>,
) {
match item {
None => {
self.paths.address_lookup_finished(Ok(()));
self.address_lookup_stream = None;
}
Some(Err(err)) => {
if let AddressLookupFailed::NoServiceConfigured { .. } = err {
trace!("Address Lookup not configured");
} else {
debug!("Address Lookup failed: {err:#}");
}
self.paths.address_lookup_finished(Err(err));
self.address_lookup_stream = None;
}
Some(Ok(item)) => {
if item.endpoint_id() != self.endpoint_id {
warn!(
?item,
"Address Lookup emitted item for wrong remote endpoint"
);
} else {
let source = Source::AddressLookup {
name: item.provenance().to_string(),
};
let addrs =
to_transports_addr(self.endpoint_id, item.into_endpoint_addr().addrs);
self.paths.insert_multiple(addrs, source);
}
}
}
}
#[instrument(skip_all)]
fn do_holepunching(&mut self, conn: noq::Connection) {
self.metrics.holepunch_attempts.inc();
let local_candidates = self.local_candidates();
match conn.initiate_nat_traversal_round() {
Ok(remote_candidates) => {
let remote_candidates = remote_candidates
.iter()
.map(|addr| SocketAddr::new(addr.ip().to_canonical(), addr.port()))
.collect();
event!(
target: "iroh::_events::qnt::init",
Level::DEBUG,
remote = %self.endpoint_id.fmt_short(),
?local_candidates,
?remote_candidates,
);
self.last_holepunch = Some(HolepunchAttempt {
when: Instant::now(),
local_candidates,
remote_candidates,
});
}
Err(err) => {
debug!("failed to initiate NAT traversal: {err:#}");
use noq_proto::n0_nat_traversal::Error;
match err {
Error::Closed
| Error::TooManyAddresses
| Error::WrongConnectionSide
| Error::ExtensionNotNegotiated => {
}
Error::Multipath(_) | Error::NotEnoughAddresses => {
let now = Instant::now();
let next_hp = now + Duration::from_millis(100);
trace!(scheduled_in = ?(next_hp - now), "holepunching retry");
self.scheduled_holepunch = Some(next_hp);
}
}
}
}
}
fn register_and_configure_path(
&mut self,
conn_id: ConnId,
conn_state: &mut ConnectionState,
path: &noq::Path,
) -> Option<transports::FourTuple> {
let network_path = self.transport_tuple_for_path(path)?;
event!(
target: "iroh::_events::path::open",
Level::DEBUG,
remote = %self.endpoint_id.fmt_short(),
%conn_id,
path_id=%path.id(),
%network_path,
);
conn_state.add_open_path(network_path.clone(), path.id(), &self.metrics);
if network_path.is_relay()
&& let Err(e) = path.set_max_idle_timeout(Some(RELAY_PATH_MAX_IDLE_TIMEOUT))
{
debug!(?e, "failed to set relay path idle timeout");
}
self.set_path_status(conn_id, path, &network_path);
self.paths
.insert_open_path(network_path.remote(), Source::Connection);
Some(network_path)
}
fn set_path_status(
&mut self,
conn_id: ConnId,
path: &noq::Path,
network_path: &transports::FourTuple,
) {
let status = self.path_status_for_addr(network_path);
match path.set_status(status) {
Err(error) => warn!(?error, ?network_path, ?status, "set_status failed"),
Ok(prev_status) if prev_status != status => {
event!(
target: "iroh::_events::path::set_status",
Level::DEBUG,
remote = %self.endpoint_id.fmt_short(),
%conn_id,
path_id=%path.id(),
%network_path,
?status,
?prev_status,
);
}
Ok(_) => {}
}
}
fn open_path_on_conn(
&mut self,
conn_id: ConnId,
conn_state: &ConnectionState,
conn: &noq::Connection,
open_addr: &transports::FourTuple,
) {
if conn.side().is_server() {
return;
}
if conn_state.paths.values().any(|a| a == open_addr) {
return;
}
let quic_addr =
open_addr.to_noq_four_tuple(&self.relay_mapped_addrs, &self.custom_mapped_addrs);
let path_status = self.path_status_for_addr(open_addr);
let fut = conn.open_path_ensure(quic_addr, path_status);
match fut.path_id() {
Some(path_id) => {
trace!(%conn_id, %path_id, ?path_status, "opening new path");
}
None => {
let ret = now_or_never(fut);
match ret {
Some(Err(PathError::RemoteCidsExhausted))
| Some(Err(PathError::MaxPathIdReached)) => {
self.scheduled_open_path =
Some(Instant::now() + Duration::from_millis(333));
self.pending_open_paths.push_back(open_addr.clone());
trace!(?open_addr, ?ret, "scheduling open_path");
}
_ => warn!(?ret, "Opening path failed"),
}
}
}
}
fn path_status_for_addr(&self, addr: &transports::FourTuple) -> PathStatus {
if Some(addr) == self.selected_path.as_ref() {
PathStatus::Available
} else {
PathStatus::Backup
}
}
fn transport_tuple_for_path(&self, path: &noq::Path) -> Option<transports::FourTuple> {
let noq_network_path = path.network_path().ok()?;
transports::FourTuple::from_noq(
noq_network_path,
&self.relay_mapped_addrs,
&self.custom_mapped_addrs,
)
}
fn local_candidates(&mut self) -> BTreeSet<SocketAddr> {
self.local_direct_addrs
.get()
.iter()
.map(|d| d.addr)
.collect()
}
}
fn update_qnt_candidates(conn: &noq::Connection, direct_addrs: &BTreeSet<SocketAddr>) {
let noq_candidates = match conn.get_local_nat_traversal_addresses() {
Ok(addrs) => BTreeSet::from_iter(addrs),
Err(err) => {
warn!("failed to get local nat candidates: {err:#}");
return;
}
};
for addr in direct_addrs.difference(&noq_candidates) {
if let Err(err) = conn.add_nat_traversal_address(*addr) {
warn!("failed adding local addr: {err:#}",);
}
}
for addr in noq_candidates.difference(direct_addrs) {
if let Err(err) = conn.remove_nat_traversal_address(*addr) {
warn!("failed removing local addr: {err:#}");
}
}
trace!(?direct_addrs, "updated local QNT addresses");
}
fn send_datagram<'a>(
sender: &'a mut TransportsSender,
addr: transports::FourTuple,
owned_transmit: OwnedTransmit,
) -> impl Future<Output = n0_error::Result<()>> + 'a {
std::future::poll_fn(move |cx| {
let transmit = transports::Transmit {
ecn: owned_transmit.ecn,
contents: owned_transmit.contents.as_ref(),
segment_size: owned_transmit.segment_size,
};
Pin::new(&mut *sender)
.poll_send(cx, &addr, &transmit)
.map(|res| res.with_context(|_| format!("failed to send datagram to {:?}", addr)))
})
}
#[derive(derive_more::Debug)]
pub(crate) enum RemoteStateMessage {
#[debug("SendDatagram(..)")]
SendDatagram(Box<TransportsSender>, OwnedTransmit),
#[debug("AddConnection({})", _0.stable_id())]
AddConnection(noq::Connection, oneshot::Sender<PathStateReceiver>),
#[debug("ResolveRemote(..)")]
ResolveRemote(
BTreeSet<TransportAddr>,
oneshot::Sender<Result<(), AddressLookupFailed>>,
),
RemoteInfo(oneshot::Sender<RemoteInfo>),
NetworkChange { is_major: bool },
}
#[derive(Debug)]
struct HolepunchAttempt {
when: Instant,
local_candidates: BTreeSet<SocketAddr>,
remote_candidates: BTreeSet<SocketAddr>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, derive_more::Display)]
#[display("{_0}")]
struct ConnId(usize);
#[derive(Debug)]
struct ConnectionState {
handle: WeakConnectionHandle,
path_state: PathStateSender,
paths: FxHashMap<PathId, transports::FourTuple>,
has_been_direct: bool,
}
impl ConnectionState {
fn add_open_path(
&mut self,
network_path: transports::FourTuple,
path_id: PathId,
metrics: &Arc<SocketMetrics>,
) {
match network_path {
transports::FourTuple::Ip { .. } => metrics.paths_direct.inc(),
transports::FourTuple::Relay { .. } => metrics.paths_relay.inc(),
transports::FourTuple::Custom { .. } => metrics.paths_custom.inc(),
};
if !self.has_been_direct && network_path.is_ip() {
self.has_been_direct = true;
metrics.num_conns_direct.inc();
}
self.paths.insert(path_id, network_path.clone());
if let Some(conn) = self.handle.upgrade()
&& let Some(path) = conn.path(path_id)
{
let handle = path.weak_handle();
self.path_state.record_opened(handle, network_path);
}
}
fn remove_path(
&mut self,
path_id: &PathId,
conn: &noq::Connection,
) -> Option<transports::FourTuple> {
let addr = self.paths.remove(path_id)?;
self.path_state.record_abandoned(*path_id, conn);
Some(addr)
}
}
#[derive(Debug)]
#[cfg_attr(not(feature = "unstable-custom-transports"), allow(unreachable_pub))]
pub struct PathSelectionContext<'a> {
current: Option<&'a transports::FourTuple>,
source: PathsSource<'a>,
}
#[derive(Debug)]
enum PathsSource<'a> {
Live(&'a FxHashMap<ConnId, ConnectionState>),
#[cfg(test)]
Test(Vec<PathSelectionData<'a>>),
}
#[cfg_attr(not(feature = "unstable-custom-transports"), allow(unreachable_pub))]
impl<'a> PathSelectionContext<'a> {
fn new(
current: Option<&'a transports::FourTuple>,
connections: &'a FxHashMap<ConnId, ConnectionState>,
) -> Self {
Self {
current,
source: PathsSource::Live(connections),
}
}
#[cfg(test)]
pub(crate) fn for_test(
current: Option<&'a transports::FourTuple>,
paths: Vec<PathSelectionData<'a>>,
) -> Self {
Self {
current,
source: PathsSource::Test(paths),
}
}
pub fn current(&self) -> Option<&transports::FourTuple> {
self.current
}
pub fn paths(&self) -> Box<dyn Iterator<Item = PathSelectionData<'a>> + '_> {
match &self.source {
PathsSource::Live(connections) => Box::new(
connections
.values()
.filter_map(|state| state.handle.upgrade().map(|conn| (state, conn)))
.flat_map(|(state, conn)| {
state.paths.iter().map(move |(path_id, addr)| {
PathSelectionData::live(addr, *path_id, conn.clone())
})
}),
),
#[cfg(test)]
PathsSource::Test(paths) => Box::new(paths.iter().cloned()),
}
}
}
#[cfg_attr(not(feature = "unstable-custom-transports"), allow(unreachable_pub))]
#[derive(derive_more::Debug, Clone)]
pub struct PathSelectionData<'a> {
network_path: &'a transports::FourTuple,
#[debug(skip)]
source: StatsSource,
}
#[derive(Clone)]
enum StatsSource {
Live {
path_id: PathId,
conn: noq::Connection,
},
#[cfg(test)]
Test(Option<Box<PathStats>>),
}
#[cfg_attr(not(feature = "unstable-custom-transports"), allow(unreachable_pub))]
impl<'a> PathSelectionData<'a> {
fn live(
network_path: &'a transports::FourTuple,
path_id: PathId,
conn: noq::Connection,
) -> Self {
Self {
network_path,
source: StatsSource::Live { path_id, conn },
}
}
#[cfg(test)]
pub(crate) fn for_test(
network_path: &'a transports::FourTuple,
stats: Option<PathStats>,
) -> Self {
Self {
network_path,
source: StatsSource::Test(stats.map(Box::new)),
}
}
pub fn network_path(&self) -> &transports::FourTuple {
self.network_path
}
pub fn stats(&self) -> Option<PathStats> {
match &self.source {
StatsSource::Live { path_id, conn } => conn.path_stats(*path_id),
#[cfg(test)]
StatsSource::Test(stats) => stats.as_deref().copied(),
}
}
}
#[cfg_attr(not(feature = "unstable-custom-transports"), allow(unreachable_pub))]
pub trait PathSelector: Send + Sync + std::fmt::Debug + 'static {
fn select(&self, ctx: &PathSelectionContext<'_>) -> PathSelection;
}
#[derive(Debug, Clone)]
#[cfg_attr(not(feature = "unstable-custom-transports"), allow(unreachable_pub))]
pub struct PathSelection {
selection: Option<transports::FourTuple>,
}
#[cfg_attr(not(feature = "unstable-custom-transports"), allow(unreachable_pub))]
impl PathSelection {
pub fn none() -> Self {
Self { selection: None }
}
pub fn set(&mut self, path: &PathSelectionData<'_>) {
if self.selection.is_some() {
tracing::warn!(
path = %path.network_path(),
"PathSelection already contains a path; ignoring additional path"
);
return;
}
self.selection = Some(path.network_path.clone());
}
pub(crate) fn selected(&self) -> Option<&transports::FourTuple> {
self.selection.as_ref()
}
}
fn now_or_never<T, F: Future<Output = T>>(fut: F) -> Option<T> {
let fut = std::pin::pin!(fut);
match fut.poll(&mut std::task::Context::from_waker(std::task::Waker::noop())) {
Poll::Ready(res) => Some(res),
Poll::Pending => None,
}
}
struct OnClosed {
conn_id: ConnId,
inner: noq::OnClosed,
}
impl OnClosed {
fn new(conn: &noq::Connection) -> Self {
Self {
conn_id: ConnId(conn.stable_id()),
inner: conn.on_closed(),
}
}
}
impl Future for OnClosed {
type Output = (ConnId, Closed);
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let closed = std::task::ready!(Pin::new(&mut self.inner).poll(cx));
Poll::Ready((self.conn_id, closed))
}
}
fn to_transports_addr(
endpoint_id: EndpointId,
addrs: impl IntoIterator<Item = TransportAddr>,
) -> impl Iterator<Item = transports::Addr> {
addrs.into_iter().filter_map(move |addr| match addr {
TransportAddr::Relay(relay_url) => Some(transports::Addr::from((relay_url, endpoint_id))),
TransportAddr::Ip(sockaddr) => Some(transports::Addr::from(sockaddr)),
TransportAddr::Custom(custom_addr) => Some(transports::Addr::from(custom_addr)),
_ => {
warn!(?addr, "Unsupported TransportAddr");
None
}
})
}
async fn maybe_next<S: Stream + Unpin>(maybe_stream: Option<&mut S>) -> Option<Option<S::Item>> {
match maybe_stream {
None => None,
Some(s) => Some(s.next().await),
}
}