use std::{
collections::{HashMap, HashSet, VecDeque},
sync::Arc,
};
use n0_error::e;
use n0_future::time::Instant;
use rustc_hash::FxHashMap;
use tokio::sync::oneshot;
use tracing::trace;
use super::{Source, TransportAddrInfo, TransportAddrUsage};
use crate::{address_lookup::AddressLookupFailed, metrics::SocketMetrics, socket::transports};
pub(super) const MAX_NON_RELAY_PATHS: usize = 30;
pub(super) const MAX_INACTIVE_NON_RELAY_PATHS: usize = 10;
#[derive(Debug)]
pub(super) struct RemotePathState {
paths: FxHashMap<transports::Addr, PathState>,
pending_resolve_requests: VecDeque<oneshot::Sender<Result<(), AddressLookupFailed>>>,
metrics: Arc<SocketMetrics>,
}
#[derive(Debug, Default)]
pub(super) enum PathStatus {
Open,
Inactive(Instant),
Unusable,
#[default]
Unknown,
}
impl RemotePathState {
pub(super) fn new(metrics: Arc<SocketMetrics>) -> Self {
Self {
paths: Default::default(),
pending_resolve_requests: Default::default(),
metrics,
}
}
pub(super) fn to_remote_addrs(&self) -> Vec<TransportAddrInfo> {
self.paths
.iter()
.flat_map(|(addr, state)| {
let usage = match state.status {
PathStatus::Open => TransportAddrUsage::Active,
PathStatus::Inactive(_) | PathStatus::Unusable | PathStatus::Unknown => {
TransportAddrUsage::Inactive
}
};
Some(TransportAddrInfo {
addr: addr.clone().into(),
usage,
})
})
.collect()
}
pub(super) fn insert_open_path(&mut self, addr: transports::Addr, source: Source) {
match addr {
transports::Addr::Ip(_) => self.metrics.transport_ip_paths_added.inc(),
transports::Addr::Relay(_, _) => self.metrics.transport_relay_paths_added.inc(),
transports::Addr::Custom(_) => self.metrics.transport_custom_paths_added.inc(),
};
let state = self.paths.entry(addr).or_default();
state.status = PathStatus::Open;
state.sources.insert(source.clone(), Instant::now());
self.emit_pending_resolve_requests(None);
self.prune_paths();
}
pub(super) fn abandoned_path(&mut self, addr: &transports::Addr) {
if let Some(state) = self.paths.get_mut(addr) {
if matches!(state.status, PathStatus::Open) {
match addr {
transports::Addr::Ip(_) => self.metrics.transport_ip_paths_removed.inc(),
transports::Addr::Relay(_, _) => {
self.metrics.transport_relay_paths_removed.inc()
}
transports::Addr::Custom(_) => {
self.metrics.transport_custom_paths_removed.inc()
}
};
}
match state.status {
PathStatus::Open | PathStatus::Inactive(_) => {
state.status = PathStatus::Inactive(Instant::now());
}
PathStatus::Unusable | PathStatus::Unknown => {
state.status = PathStatus::Unusable;
}
}
}
}
pub(super) fn insert_multiple(
&mut self,
addrs: impl Iterator<Item = transports::Addr>,
source: Source,
) {
let now = Instant::now();
for addr in addrs {
self.paths
.entry(addr)
.or_default()
.sources
.insert(source.clone(), now);
}
trace!("added addressing information");
self.emit_pending_resolve_requests(None);
self.prune_paths();
}
pub(super) fn resolve_remote(&mut self, tx: oneshot::Sender<Result<(), AddressLookupFailed>>) {
if !self.paths.is_empty() {
tx.send(Ok(())).ok();
} else {
self.pending_resolve_requests.push_back(tx);
}
}
pub(super) fn address_lookup_finished(&mut self, result: Result<(), AddressLookupFailed>) {
self.emit_pending_resolve_requests(result.err());
}
pub(super) fn addrs(&self) -> impl Iterator<Item = &transports::Addr> {
self.paths.keys()
}
pub(super) fn is_empty(&self) -> bool {
self.paths.is_empty()
}
fn emit_pending_resolve_requests(&mut self, address_lookup_error: Option<AddressLookupFailed>) {
if self.pending_resolve_requests.is_empty() {
return;
}
let result = match (self.paths.is_empty(), address_lookup_error) {
(false, _) => Ok(()),
(true, Some(err)) => Err(err),
(true, None) => Err(e!(AddressLookupFailed::NoResults { errors: Vec::new() })),
};
for tx in self.pending_resolve_requests.drain(..) {
tx.send(result.clone()).ok();
}
}
pub(super) fn prune_paths(&mut self) {
prune_non_relay_paths(&mut self.paths);
}
}
#[derive(Debug, Default)]
pub(super) struct PathState {
pub(super) sources: HashMap<Source, Instant>,
pub(super) status: PathStatus,
}
fn prune_non_relay_paths(paths: &mut FxHashMap<transports::Addr, PathState>) {
if paths.len() < MAX_NON_RELAY_PATHS {
return;
}
let primary_paths: Vec<_> = paths.iter().filter(|(addr, _)| !addr.is_relay()).collect();
if primary_paths.len() < MAX_NON_RELAY_PATHS {
return;
}
let mut inactive = Vec::with_capacity(primary_paths.len());
let mut failed = Vec::with_capacity(primary_paths.len());
for (addr, state) in primary_paths {
match state.status {
PathStatus::Inactive(t) => {
inactive.push((addr.clone(), t));
}
PathStatus::Unusable => {
failed.push(addr.clone());
}
_ => {
}
}
}
if failed.len() == paths.len() {
failed.truncate(paths.len().saturating_sub(MAX_NON_RELAY_PATHS));
}
inactive.sort_by_key(|b| std::cmp::Reverse(b.1));
let old_inactive =
inactive.split_off(inactive.len().saturating_sub(MAX_INACTIVE_NON_RELAY_PATHS));
let must_prune: HashSet<_> = failed
.into_iter()
.chain(old_inactive.into_iter().map(|(addr, _)| addr))
.collect();
paths.retain(|addr, _| !must_prune.contains(addr));
}
#[cfg(test)]
mod tests {
use std::{
net::{Ipv4Addr, SocketAddrV4},
time::Duration,
};
use iroh_base::{RelayUrl, SecretKey};
use rand::{RngExt, SeedableRng};
use super::*;
fn ip_addr(port: u16) -> transports::Addr {
transports::Addr::Ip(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into())
}
fn path_state_inactive(closed: Instant) -> PathState {
PathState {
sources: HashMap::new(),
status: PathStatus::Inactive(closed),
}
}
fn path_state_unusable() -> PathState {
PathState {
sources: HashMap::new(),
status: PathStatus::Unusable,
}
}
#[test]
fn test_prune_under_max_paths() {
let mut paths = FxHashMap::default();
for i in 0..20 {
paths.insert(ip_addr(i), PathState::default());
}
prune_non_relay_paths(&mut paths);
assert_eq!(
20,
paths.len(),
"should not prune when under MAX_NON_RELAY_PATHS"
);
}
#[test]
fn test_prune_at_max_paths_no_prunable() {
let mut paths = FxHashMap::default();
for i in 0..MAX_NON_RELAY_PATHS {
paths.insert(ip_addr(i as u16), PathState::default());
}
prune_non_relay_paths(&mut paths);
assert_eq!(
MAX_NON_RELAY_PATHS,
paths.len(),
"should not prune active paths"
);
}
#[test]
fn test_prune_failed_holepunch() {
let mut paths = FxHashMap::default();
for i in 0..20 {
paths.insert(ip_addr(i), PathState::default());
}
for i in 20..35 {
paths.insert(ip_addr(i), path_state_unusable());
}
prune_non_relay_paths(&mut paths);
assert_eq!(20, paths.len());
for i in 0..20 {
assert!(paths.contains_key(&ip_addr(i)));
}
for i in 20..35 {
assert!(!paths.contains_key(&ip_addr(i)));
}
}
#[test]
fn test_prune_keeps_most_recent_inactive() {
let mut paths = FxHashMap::default();
let now = Instant::now();
for i in 0..15 {
paths.insert(ip_addr(i), PathState::default());
}
for i in 0..20 {
let abandoned_time = now - Duration::from_secs((20 - i) as u64);
paths.insert(ip_addr(15 + i as u16), path_state_inactive(abandoned_time));
}
assert_eq!(35, paths.len());
prune_non_relay_paths(&mut paths);
assert_eq!(25, paths.len());
for i in 0..15 {
assert!(paths.contains_key(&ip_addr(i)));
}
for i in 25..35 {
assert!(paths.contains_key(&ip_addr(i)), "port {} should be kept", i);
}
for i in 15..25 {
assert!(
!paths.contains_key(&ip_addr(i)),
"port {} should be pruned",
i
);
}
}
#[test]
fn test_prune_mixed_must_and_can_prune() {
let mut paths = FxHashMap::default();
let now = Instant::now();
for i in 0..15 {
paths.insert(ip_addr(i), PathState::default());
}
for i in 15..20 {
paths.insert(ip_addr(i), path_state_unusable());
}
for i in 0..15 {
let abandoned_time = now - Duration::from_secs((15 - i) as u64);
paths.insert(ip_addr(20 + i as u16), path_state_inactive(abandoned_time));
}
assert_eq!(35, paths.len());
prune_non_relay_paths(&mut paths);
assert_eq!(20, paths.len());
for i in 0..15 {
assert!(paths.contains_key(&ip_addr(i)));
}
for i in 15..20 {
assert!(!paths.contains_key(&ip_addr(i)));
}
for i in 30..35 {
assert!(paths.contains_key(&ip_addr(i)), "port {} should be kept", i);
}
}
#[test]
fn test_prune_relay_paths_not_counted() {
let mut paths = FxHashMap::default();
for i in 0..25 {
paths.insert(ip_addr(i), path_state_unusable());
}
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let relay_url: RelayUrl = url::Url::parse("https://localhost")
.expect("should be valid url")
.into();
for _ in 0..10 {
let id = SecretKey::from_bytes(&rng.random()).public();
let relay_addr = transports::Addr::Relay(relay_url.clone(), id);
paths.insert(relay_addr, PathState::default());
}
assert_eq!(35, paths.len()); prune_non_relay_paths(&mut paths);
assert_eq!(35, paths.len());
}
#[test]
fn test_prune_preserves_never_dialed() {
let mut paths = FxHashMap::default();
for i in 0..20 {
paths.insert(ip_addr(i), PathState::default());
}
for i in 20..35 {
paths.insert(ip_addr(i), path_state_unusable());
}
prune_non_relay_paths(&mut paths);
for i in 0..20 {
assert!(paths.contains_key(&ip_addr(i)));
}
}
#[test]
fn test_prune_all_paths_failed() {
let mut paths = FxHashMap::default();
for i in 0..40 {
paths.insert(ip_addr(i), path_state_unusable());
}
assert_eq!(40, paths.len());
prune_non_relay_paths(&mut paths);
assert_eq!(
MAX_NON_RELAY_PATHS,
paths.len(),
"should keep MAX_NON_RELAY_PATHS when all paths failed"
);
}
#[test]
fn test_insert_open_path() {
let mut state = RemotePathState::new(Default::default());
let addr = ip_addr(1000);
let source = Source::Udp;
assert!(state.is_empty());
state.insert_open_path(addr.clone(), source.clone());
assert!(!state.is_empty());
assert!(state.paths.contains_key(&addr));
let path = &state.paths[&addr];
assert!(matches!(path.status, PathStatus::Open));
assert_eq!(path.sources.len(), 1);
assert!(path.sources.contains_key(&source));
}
#[test]
fn test_abandoned_path() {
let metrics = Arc::new(SocketMetrics::default());
let mut state = RemotePathState::new(metrics.clone());
let addr_open = ip_addr(1000);
state.insert_open_path(addr_open.clone(), Source::Udp);
assert!(matches!(state.paths[&addr_open].status, PathStatus::Open));
assert_eq!(metrics.transport_ip_paths_added.get(), 1);
state.abandoned_path(&addr_open);
assert!(matches!(
state.paths[&addr_open].status,
PathStatus::Inactive(_)
));
assert_eq!(metrics.transport_ip_paths_added.get(), 1);
assert_eq!(metrics.transport_ip_paths_removed.get(), 1);
state.abandoned_path(&addr_open);
assert!(matches!(
state.paths[&addr_open].status,
PathStatus::Inactive(_)
));
assert_eq!(metrics.transport_ip_paths_added.get(), 1);
assert_eq!(metrics.transport_ip_paths_removed.get(), 1);
let addr_unknown = ip_addr(2000);
state.insert_multiple([addr_unknown.clone()].into_iter(), Source::Relay);
assert!(matches!(
state.paths[&addr_unknown].status,
PathStatus::Unknown
));
assert_eq!(metrics.transport_ip_paths_added.get(), 1);
assert_eq!(metrics.transport_ip_paths_removed.get(), 1);
state.abandoned_path(&addr_unknown);
assert!(matches!(
state.paths[&addr_unknown].status,
PathStatus::Unusable
));
assert_eq!(metrics.transport_ip_paths_added.get(), 1);
assert_eq!(metrics.transport_ip_paths_removed.get(), 1);
state.abandoned_path(&addr_unknown);
assert!(matches!(
state.paths[&addr_unknown].status,
PathStatus::Unusable
));
assert_eq!(metrics.transport_ip_paths_added.get(), 1);
assert_eq!(metrics.transport_ip_paths_removed.get(), 1);
state.insert_open_path(addr_unknown.clone(), Source::Udp);
assert!(matches!(
state.paths[&addr_unknown].status,
PathStatus::Open
));
assert_eq!(metrics.transport_ip_paths_added.get(), 2);
assert_eq!(metrics.transport_ip_paths_removed.get(), 1);
}
}