use std::net::SocketAddr;
use std::time::Duration;
pub const MAX_CANDIDATES_PER_PEER: usize = 30;
pub const MAX_INACTIVE_CANDIDATES: usize = 10;
pub const RTT_SWITCHING_MIN: Duration = Duration::from_millis(5);
pub const IPV6_RTT_ADVANTAGE: Duration = Duration::from_millis(3);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PathType {
Direct,
Relay,
}
#[derive(Debug, Clone)]
pub struct PathCandidate {
pub addr: SocketAddr,
pub rtt: Duration,
pub path_type: PathType,
}
impl PathCandidate {
pub fn new(addr: SocketAddr, rtt: Duration) -> Self {
Self {
addr,
rtt,
path_type: PathType::Direct,
}
}
pub fn direct(addr: SocketAddr, rtt: Duration) -> Self {
Self {
addr,
rtt,
path_type: PathType::Direct,
}
}
pub fn relay(addr: SocketAddr, rtt: Duration) -> Self {
Self {
addr,
rtt,
path_type: PathType::Relay,
}
}
pub fn is_direct(&self) -> bool {
self.path_type == PathType::Direct
}
pub fn is_relay(&self) -> bool {
self.path_type == PathType::Relay
}
pub fn effective_rtt(&self) -> Duration {
if self.addr.is_ipv6() {
self.rtt.saturating_sub(IPV6_RTT_ADVANTAGE)
} else {
self.rtt
}
}
}
pub fn select_best_path(
paths: &[PathCandidate],
current: Option<&PathCandidate>,
) -> Option<PathCandidate> {
if paths.is_empty() {
return None;
}
let direct_paths: Vec<_> = paths.iter().filter(|p| p.is_direct()).collect();
let relay_paths: Vec<_> = paths.iter().filter(|p| p.is_relay()).collect();
let best_direct = find_best_by_rtt(&direct_paths);
let best_relay = find_best_by_rtt(&relay_paths);
let best_new = match (best_direct, best_relay) {
(Some(direct), _) => Some(direct),
(None, Some(relay)) => Some(relay),
(None, None) => None,
};
match (current, best_new) {
(None, best) => best.cloned(),
(Some(current), None) => Some(current.clone()),
(Some(current), Some(new)) => {
if current.is_direct() && new.is_relay() {
return Some(current.clone());
}
let current_eff = current.effective_rtt();
let new_eff = new.effective_rtt();
if current_eff > new_eff + RTT_SWITCHING_MIN {
Some(new.clone())
} else {
Some(current.clone())
}
}
}
}
fn find_best_by_rtt<'a>(paths: &[&'a PathCandidate]) -> Option<&'a PathCandidate> {
paths.iter().min_by_key(|p| p.effective_rtt()).copied()
}
pub fn select_v4_v6(
v4_addr: SocketAddr,
v4_rtt: Duration,
v6_addr: SocketAddr,
v6_rtt: Duration,
) -> (SocketAddr, Duration) {
let v6_effective = v6_rtt.saturating_sub(IPV6_RTT_ADVANTAGE);
if v6_effective <= v4_rtt {
(v6_addr, v6_rtt)
} else {
(v4_addr, v4_rtt)
}
}
use std::collections::HashMap;
pub const MIN_DIRECT_PATHS: usize = 2;
#[derive(Debug, Clone)]
pub struct PathInfo {
pub addr: SocketAddr,
pub path_type: PathType,
pub rtt: Option<Duration>,
pub is_open: bool,
}
impl PathInfo {
pub fn direct(addr: SocketAddr) -> Self {
Self {
addr,
path_type: PathType::Direct,
rtt: None,
is_open: true,
}
}
pub fn relay(addr: SocketAddr) -> Self {
Self {
addr,
path_type: PathType::Relay,
rtt: None,
is_open: true,
}
}
pub fn with_rtt(mut self, rtt: Duration) -> Self {
self.rtt = Some(rtt);
self
}
}
#[derive(Debug, Default)]
pub struct PathManager {
paths: HashMap<SocketAddr, PathInfo>,
selected_path: Option<SocketAddr>,
min_direct_paths: usize,
}
impl PathManager {
pub fn new() -> Self {
Self {
paths: HashMap::new(),
selected_path: None,
min_direct_paths: MIN_DIRECT_PATHS,
}
}
pub fn with_min_direct_paths(min_direct_paths: usize) -> Self {
Self {
paths: HashMap::new(),
selected_path: None,
min_direct_paths,
}
}
pub fn add_path(&mut self, info: PathInfo) {
self.paths.insert(info.addr, info);
}
pub fn remove_path(&mut self, addr: &SocketAddr) {
self.paths.remove(addr);
if self.selected_path.as_ref() == Some(addr) {
self.selected_path = None;
}
}
pub fn set_selected_path(&mut self, addr: SocketAddr) {
self.selected_path = Some(addr);
}
pub fn selected_path(&self) -> Option<SocketAddr> {
self.selected_path
}
pub fn contains(&self, addr: &SocketAddr) -> bool {
self.paths.contains_key(addr)
}
pub fn is_relay_path(&self, addr: &SocketAddr) -> bool {
self.paths
.get(addr)
.map(|p| p.path_type == PathType::Relay)
.unwrap_or(false)
}
pub fn direct_path_count(&self) -> usize {
self.paths
.values()
.filter(|p| p.path_type == PathType::Direct && p.is_open)
.count()
}
pub fn relay_path_count(&self) -> usize {
self.paths
.values()
.filter(|p| p.path_type == PathType::Relay && p.is_open)
.count()
}
pub fn open_paths(&self) -> Vec<&PathInfo> {
self.paths.values().filter(|p| p.is_open).collect()
}
pub fn close_redundant_paths(&mut self) -> Vec<SocketAddr> {
let Some(selected) = self.selected_path else {
return Vec::new();
};
let open_direct: Vec<_> = self
.paths
.iter()
.filter(|(_, p)| p.path_type == PathType::Direct && p.is_open)
.map(|(addr, _)| *addr)
.collect();
if open_direct.len() <= self.min_direct_paths {
return Vec::new();
}
let excess = open_direct.len() - self.min_direct_paths;
let mut to_close = Vec::new();
for addr in open_direct {
if to_close.len() >= excess {
break;
}
if addr != selected {
to_close.push(addr);
}
}
for addr in &to_close {
if let Some(path) = self.paths.get_mut(addr) {
path.is_open = false;
}
}
tracing::debug!(
closed = to_close.len(),
remaining = self.direct_path_count(),
"Closed redundant paths"
);
to_close
}
pub fn update_rtt(&mut self, addr: &SocketAddr, rtt: Duration) {
if let Some(path) = self.paths.get_mut(addr) {
path.rtt = Some(rtt);
}
}
pub fn mark_open(&mut self, addr: &SocketAddr) {
if let Some(path) = self.paths.get_mut(addr) {
path.is_open = true;
}
}
pub fn mark_closed(&mut self, addr: &SocketAddr) {
if let Some(path) = self.paths.get_mut(addr) {
path.is_open = false;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
fn v4_addr(port: u16) -> SocketAddr {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 1), port))
}
fn v6_addr(port: u16) -> SocketAddr {
SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1),
port,
0,
0,
))
}
#[test]
fn test_selects_lower_rtt_path() {
let paths = vec![
PathCandidate::new(v4_addr(5000), Duration::from_millis(50)),
PathCandidate::new(v4_addr(5001), Duration::from_millis(20)),
PathCandidate::new(v4_addr(5002), Duration::from_millis(100)),
];
let selected = select_best_path(&paths, None);
assert_eq!(selected.as_ref().map(|p| p.addr.port()), Some(5001));
}
#[test]
fn test_hysteresis_prevents_flapping() {
let current = PathCandidate::new(v4_addr(5000), Duration::from_millis(50));
let paths = vec![
current.clone(),
PathCandidate::new(v4_addr(5001), Duration::from_millis(48)),
];
let selected = select_best_path(&paths, Some(¤t));
assert_eq!(selected.as_ref().map(|p| p.addr.port()), Some(5000));
}
#[test]
fn test_switches_when_significantly_better() {
let current = PathCandidate::new(v4_addr(5000), Duration::from_millis(50));
let paths = vec![
current.clone(),
PathCandidate::new(v4_addr(5001), Duration::from_millis(40)),
];
let selected = select_best_path(&paths, Some(¤t));
assert_eq!(selected.as_ref().map(|p| p.addr.port()), Some(5001));
}
#[test]
fn test_ipv6_preference() {
let paths = vec![
PathCandidate::new(v4_addr(5000), Duration::from_millis(50)),
PathCandidate::new(v6_addr(5001), Duration::from_millis(50)),
];
let selected = select_best_path(&paths, None);
assert!(selected.as_ref().map(|p| p.addr.is_ipv6()).unwrap_or(false));
}
#[test]
fn test_ipv6_advantage_applied_correctly() {
let paths = vec![
PathCandidate::new(v4_addr(5000), Duration::from_millis(48)),
PathCandidate::new(v6_addr(5001), Duration::from_millis(50)),
];
let selected = select_best_path(&paths, None);
assert!(selected.as_ref().map(|p| p.addr.is_ipv6()).unwrap_or(false));
}
#[test]
fn test_direct_preferred_over_relay() {
let paths = vec![
PathCandidate::direct(v4_addr(5000), Duration::from_millis(100)),
PathCandidate::relay(v4_addr(5001), Duration::from_millis(50)),
];
let selected = select_best_path(&paths, None);
assert!(selected.as_ref().map(|p| p.is_direct()).unwrap_or(false));
}
#[test]
fn test_falls_back_to_relay_when_no_direct() {
let paths = vec![
PathCandidate::relay(v4_addr(5000), Duration::from_millis(100)),
PathCandidate::relay(v4_addr(5001), Duration::from_millis(50)),
];
let selected = select_best_path(&paths, None);
assert_eq!(selected.as_ref().map(|p| p.addr.port()), Some(5001));
}
#[test]
fn test_never_switches_from_direct_to_relay() {
let current = PathCandidate::direct(v4_addr(5000), Duration::from_millis(100));
let paths = vec![
current.clone(),
PathCandidate::relay(v4_addr(5001), Duration::from_millis(10)),
];
let selected = select_best_path(&paths, Some(¤t));
assert!(selected.as_ref().map(|p| p.is_direct()).unwrap_or(false));
}
#[test]
fn test_empty_paths_returns_none() {
let paths: Vec<PathCandidate> = vec![];
let selected = select_best_path(&paths, None);
assert!(selected.is_none());
}
#[test]
fn test_all_paths_same_rtt() {
let paths = vec![
PathCandidate::new(v4_addr(5000), Duration::from_millis(50)),
PathCandidate::new(v4_addr(5001), Duration::from_millis(50)),
PathCandidate::new(v4_addr(5002), Duration::from_millis(50)),
];
let selected = select_best_path(&paths, None);
assert!(selected.is_some());
}
#[test]
fn test_select_v4_v6_prefers_faster() {
let (addr, rtt) = select_v4_v6(
v4_addr(5000),
Duration::from_millis(100),
v6_addr(5001),
Duration::from_millis(50),
);
assert!(addr.is_ipv6());
assert_eq!(rtt, Duration::from_millis(50));
}
#[test]
fn test_select_v4_v6_applies_ipv6_advantage() {
let (addr, _) = select_v4_v6(
v4_addr(5000),
Duration::from_millis(48),
v6_addr(5001),
Duration::from_millis(50),
);
assert!(addr.is_ipv6());
}
#[test]
fn test_path_manager_closes_redundant_direct_paths() {
let mut manager = PathManager::with_min_direct_paths(2);
for port in 5000..5005 {
manager.add_path(PathInfo::direct(v4_addr(port)));
}
manager.set_selected_path(v4_addr(5000));
let closed = manager.close_redundant_paths();
assert_eq!(closed.len(), 3);
assert!(!closed.contains(&v4_addr(5000)));
assert_eq!(manager.direct_path_count(), 2);
}
#[test]
fn test_path_manager_keeps_minimum_direct_paths() {
let mut manager = PathManager::with_min_direct_paths(2);
manager.add_path(PathInfo::direct(v4_addr(5000)));
manager.add_path(PathInfo::direct(v4_addr(5001)));
manager.set_selected_path(v4_addr(5000));
let closed = manager.close_redundant_paths();
assert!(closed.is_empty());
assert_eq!(manager.direct_path_count(), 2);
}
#[test]
fn test_path_manager_never_closes_relay_paths() {
let mut manager = PathManager::with_min_direct_paths(1);
manager.add_path(PathInfo::direct(v4_addr(5000)));
manager.add_path(PathInfo::direct(v4_addr(5001)));
manager.add_path(PathInfo::direct(v4_addr(5002)));
manager.add_path(PathInfo::relay(v4_addr(6000)));
manager.add_path(PathInfo::relay(v4_addr(6001)));
manager.set_selected_path(v4_addr(5000));
let closed = manager.close_redundant_paths();
for addr in &closed {
assert!(!manager.is_relay_path(addr), "Closed a relay path!");
}
assert_eq!(manager.relay_path_count(), 2);
}
#[test]
fn test_path_manager_does_not_close_selected_path() {
let mut manager = PathManager::with_min_direct_paths(1);
manager.add_path(PathInfo::direct(v4_addr(5000)));
manager.add_path(PathInfo::direct(v4_addr(5001)));
manager.add_path(PathInfo::direct(v4_addr(5002)));
manager.set_selected_path(v4_addr(5000));
let closed = manager.close_redundant_paths();
assert_eq!(closed.len(), 2);
assert!(!closed.contains(&v4_addr(5000)));
assert!(manager.contains(&v4_addr(5000)));
}
#[test]
fn test_path_manager_no_close_without_selected() {
let mut manager = PathManager::with_min_direct_paths(1);
manager.add_path(PathInfo::direct(v4_addr(5000)));
manager.add_path(PathInfo::direct(v4_addr(5001)));
manager.add_path(PathInfo::direct(v4_addr(5002)));
let closed = manager.close_redundant_paths();
assert!(closed.is_empty());
}
#[test]
fn test_path_manager_add_and_remove() {
let mut manager = PathManager::new();
let addr = v4_addr(5000);
manager.add_path(PathInfo::direct(addr));
assert!(manager.contains(&addr));
manager.remove_path(&addr);
assert!(!manager.contains(&addr));
}
#[test]
fn test_path_manager_update_rtt() {
let mut manager = PathManager::new();
let addr = v4_addr(5000);
manager.add_path(PathInfo::direct(addr));
manager.update_rtt(&addr, Duration::from_millis(50));
let paths = manager.open_paths();
assert_eq!(paths.len(), 1);
assert_eq!(paths[0].rtt, Some(Duration::from_millis(50)));
}
#[test]
fn test_path_manager_mark_open_closed() {
let mut manager = PathManager::new();
let addr = v4_addr(5000);
manager.add_path(PathInfo::direct(addr));
assert_eq!(manager.direct_path_count(), 1);
manager.mark_closed(&addr);
assert_eq!(manager.direct_path_count(), 0);
manager.mark_open(&addr);
assert_eq!(manager.direct_path_count(), 1);
}
#[test]
fn test_path_manager_selected_path_cleared_on_remove() {
let mut manager = PathManager::new();
let addr = v4_addr(5000);
manager.add_path(PathInfo::direct(addr));
manager.set_selected_path(addr);
assert_eq!(manager.selected_path(), Some(addr));
manager.remove_path(&addr);
assert_eq!(manager.selected_path(), None);
}
}