use std::collections::{HashMap, HashSet};
use std::fmt;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use futures::{StreamExt, stream::FuturesUnordered};
#[cfg(test)]
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::config::{GlobalExecutor, GlobalRng};
use crate::dev_tool::Location;
use crate::message::{InnerMessage, NodeEvent, Transaction};
use crate::node::OpManager;
use crate::operations::OpError;
use crate::ring::{KnownPeerKeyLocation, PeerAddr, PeerKeyLocation};
use crate::router::{EstimatorType, IsotonicEstimator, IsotonicEvent};
use crate::transport::TransportKeypair;
use crate::util::{Contains, IterExt};
use super::VisitedPeers;
pub(crate) mod op_ctx_task;
#[cfg(test)]
const FORWARD_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(20);
const RECENCY_COOLDOWN: Duration = Duration::from_secs(30);
const MIN_FORWARD_SCORE: f64 = 0.15;
const NEAR_TERMINUS_DISTANCE: f64 = 0.05;
const NEAR_TERMINUS_ACCEPT_PROB: f64 = 0.6;
const UPHILL_RELIABILITY_TOLERANCE: f64 = 0.1;
fn filter_low_score_peers<T>(scored: &mut Vec<(f64, T)>, eligible_count: usize) {
if scored.len() + eligible_count <= 1 {
return;
}
let survivors = scored
.iter()
.filter(|(s, _)| *s >= MIN_FORWARD_SCORE)
.count();
if survivors > 0 || eligible_count > 0 {
scored.retain(|(score, _)| *score >= MIN_FORWARD_SCORE);
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum ConnectMsg {
Request {
id: Transaction,
payload: ConnectRequest,
},
Response {
id: Transaction,
payload: ConnectResponse,
},
ObservedAddress {
id: Transaction,
address: SocketAddr,
},
Rejected {
id: Transaction,
desired_location: Location,
},
ConnectFailed {
id: Transaction,
failed_acceptor_addr: SocketAddr,
},
}
impl InnerMessage for ConnectMsg {
fn id(&self) -> &Transaction {
match self {
ConnectMsg::Request { id, .. }
| ConnectMsg::Response { id, .. }
| ConnectMsg::ObservedAddress { id, .. }
| ConnectMsg::Rejected { id, .. }
| ConnectMsg::ConnectFailed { id, .. } => id,
}
}
fn requested_location(&self) -> Option<Location> {
match self {
ConnectMsg::Request { payload, .. } => Some(payload.desired_location),
ConnectMsg::Rejected {
desired_location, ..
} => Some(*desired_location),
ConnectMsg::Response { .. }
| ConnectMsg::ObservedAddress { .. }
| ConnectMsg::ConnectFailed { .. } => None,
}
}
}
impl fmt::Display for ConnectMsg {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnectMsg::Request { payload, .. } => write!(
f,
"ConnectRequest {{ desired: {}, ttl: {}, uphill: {}, joiner: {} }}",
payload.desired_location, payload.ttl, payload.uphill_budget, payload.joiner
),
ConnectMsg::Response { payload, .. } => {
write!(f, "ConnectResponse {{ acceptor: {} }}", payload.acceptor,)
}
ConnectMsg::ObservedAddress { address, .. } => {
write!(f, "ObservedAddress {{ address: {address} }}")
}
ConnectMsg::Rejected {
desired_location, ..
} => {
write!(f, "Rejected {{ desired: {desired_location} }}")
}
ConnectMsg::ConnectFailed {
failed_acceptor_addr,
..
} => {
write!(
f,
"ConnectFailed {{ failed_acceptor: {failed_acceptor_addr} }}"
)
}
}
}
}
pub(crate) const DEFAULT_UPHILL_BUDGET: u8 = 8;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ConnectRequest {
pub desired_location: Location,
pub joiner: PeerKeyLocation,
pub ttl: u8,
pub visited: VisitedPeers,
#[serde(default = "default_uphill_budget")]
pub uphill_budget: u8,
}
fn default_uphill_budget() -> u8 {
DEFAULT_UPHILL_BUDGET
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct ConnectResponse {
pub acceptor: PeerKeyLocation,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) enum ConnectState {
WaitingForResponses(JoinerState),
Relaying(Box<RelayState>),
Completed,
}
#[derive(Debug, Clone)]
#[allow(dead_code)] pub(crate) struct JoinerState {
pub target_connections: usize,
pub observed_address: Option<SocketAddr>,
pub accepted: HashSet<PeerKeyLocation>,
pub last_progress: Instant,
pub started_without_address: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct RelayState {
pub upstream_addr: SocketAddr,
pub request: ConnectRequest,
pub forwarded_to: Option<PeerKeyLocation>,
pub forwarded_at: Option<Instant>,
pub observed_sent: bool,
pub accepted_locally: bool,
pub response_forwarded: bool,
}
pub(crate) trait RelayContext {
fn self_location(&self) -> &PeerKeyLocation;
fn should_accept(&self, joiner: &KnownPeerKeyLocation) -> bool;
fn select_next_hop(
&self,
desired_location: Location,
visited: &VisitedPeers,
recency: &HashMap<PeerKeyLocation, Instant>,
estimator: &ConnectForwardEstimator,
) -> Option<PeerKeyLocation>;
fn select_uphill_hop(
&self,
desired_location: Location,
visited: &VisitedPeers,
recency: &HashMap<PeerKeyLocation, Instant>,
) -> Option<PeerKeyLocation>;
}
#[derive(Debug)]
pub(crate) struct AcceptOutcome {
pub response: ConnectResponse,
pub joiner: PeerKeyLocation,
}
#[derive(Debug, Default)]
pub(crate) struct RelayActions {
pub accept: Option<AcceptOutcome>,
pub forward: Option<(PeerKeyLocation, ConnectRequest)>,
pub observed_address: Option<(PeerKeyLocation, SocketAddr)>,
pub rejected: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct ForwardAttempt {
#[cfg_attr(not(test), allow(dead_code))]
peer: PeerKeyLocation,
#[cfg_attr(not(test), allow(dead_code))]
desired: Location,
#[cfg_attr(not(test), allow(dead_code))]
sent_at: Instant,
}
#[derive(Debug, Clone)]
pub(crate) struct ConnectForwardEstimator {
estimator: IsotonicEstimator,
}
impl ConnectForwardEstimator {
pub(crate) fn new() -> Self {
let key = TransportKeypair::new();
let dummy_peer = PeerKeyLocation::new(key.public().clone(), "127.0.0.1:0".parse().unwrap());
let seed_events = [
IsotonicEvent {
peer: dummy_peer.clone(),
contract_location: Location::new(0.0),
result: 0.5,
},
IsotonicEvent {
peer: dummy_peer,
contract_location: Location::new(0.5),
result: 0.5,
},
];
Self {
estimator: IsotonicEstimator::new(seed_events, EstimatorType::Negative),
}
}
pub(super) fn record(&mut self, peer: &PeerKeyLocation, desired: Location, success: bool) {
if peer.location().is_none() {
return;
}
let event = IsotonicEvent {
peer: peer.clone(),
contract_location: desired,
result: if success { 1.0 } else { 0.0 },
};
self.estimator.add_event(event);
}
fn estimate(&self, peer: &PeerKeyLocation, desired: Location) -> Option<f64> {
peer.location()?;
self.estimator
.estimate_retrieval_time(peer, desired)
.ok()
.map(|p| p.clamp(0.0, 1.0))
}
#[allow(clippy::type_complexity)]
pub(crate) fn snapshot(&self) -> (Vec<(f64, f64)>, (f64, f64), usize, usize) {
let curve = self.estimator.sampled_curve(0.0, 1.0, 50);
let data_range = self.estimator.data_x_range();
(
curve,
data_range,
self.estimator.len(),
self.estimator.peer_adjustments.len(),
)
}
}
impl RelayState {
fn forward_to_peer<C: RelayContext>(
&mut self,
_ctx: &C,
peer: PeerKeyLocation,
forward_attempts: &mut HashMap<PeerKeyLocation, ForwardAttempt>,
now: Instant,
) -> (PeerKeyLocation, ConnectRequest) {
let mut forward_req = self.request.clone();
forward_req.ttl = forward_req.ttl.saturating_sub(1);
let forward_snapshot = forward_req.clone();
self.forwarded_to = Some(peer.clone());
self.forwarded_at = Some(now);
self.request = forward_req;
forward_attempts.insert(
peer.clone(),
ForwardAttempt {
peer: peer.clone(),
desired: self.request.desired_location,
sent_at: now,
},
);
(peer, forward_snapshot)
}
pub(crate) fn handle_request<C: RelayContext>(
&mut self,
ctx: &C,
recency: &HashMap<PeerKeyLocation, Instant>,
forward_attempts: &mut HashMap<PeerKeyLocation, ForwardAttempt>,
estimator: &ConnectForwardEstimator,
now: Instant,
) -> RelayActions {
let mut actions = RelayActions::default();
tracing::debug!(
joiner_addr = ?self.request.joiner.peer_addr,
upstream_addr = %self.upstream_addr,
observed_sent = self.observed_sent,
"RelayState::step() start"
);
self.request.visited.mark_visited(self.upstream_addr);
if let Some(self_addr) = ctx.self_location().socket_addr() {
self.request.visited.mark_visited(self_addr);
}
let discovered_joiner_addr = if self.request.joiner.peer_addr.is_unknown() {
self.request.joiner.set_addr(self.upstream_addr);
true
} else {
false
};
if discovered_joiner_addr {
if let PeerAddr::Known(joiner_addr) = &self.request.joiner.peer_addr {
if !self.observed_sent {
self.observed_sent = true;
let expected_location = crate::ring::Location::from_address(joiner_addr);
tracing::debug!(
joiner_addr = %joiner_addr,
expected_location = %expected_location,
upstream_addr = %self.upstream_addr,
"discovered joiner addr, emitting ObservedAddress"
);
actions.observed_address = Some((self.request.joiner.clone(), *joiner_addr));
}
}
}
let can_forward = self.forwarded_to.is_none() && self.request.ttl > 0;
let next_hop = if can_forward {
ctx.select_next_hop(
self.request.desired_location,
&self.request.visited,
recency,
estimator,
)
} else {
None
};
let is_terminus = next_hop.is_none();
if let Some(next) = next_hop {
let dist = ring_distance(next.location(), Some(self.request.desired_location));
tracing::debug!(
target = %self.request.desired_location,
ttl = self.request.ttl,
next_peer = %next.pub_key(),
next_loc = ?next.location(),
ring_distance_to_target = ?dist,
"connect: forwarding join request to next hop (not accepting - not at terminus)"
);
actions.forward = Some(self.forward_to_peer(ctx, next, forward_attempts, now));
if !self.accepted_locally {
let my_dist = ring_distance(
ctx.self_location().location(),
Some(self.request.desired_location),
);
if let Some(d) = my_dist {
if d < NEAR_TERMINUS_DISTANCE {
let accept_prob =
NEAR_TERMINUS_ACCEPT_PROB * (1.0 - d / NEAR_TERMINUS_DISTANCE);
let roll: f64 = GlobalRng::random_range(0.0..1.0);
if roll < accept_prob {
if let Ok(joiner_known) =
KnownPeerKeyLocation::try_from(&self.request.joiner)
{
if ctx.should_accept(&joiner_known) {
self.accepted_locally = true;
let self_loc = ctx.self_location();
let acceptor = PeerKeyLocation::with_unknown_addr(
self_loc.pub_key().clone(),
);
actions.accept = Some(AcceptOutcome {
response: ConnectResponse { acceptor },
joiner: self.request.joiner.clone(),
});
tracing::info!(
acceptor_loc = ?self_loc.location(),
joiner_loc = ?self.request.joiner.location(),
ring_distance = d,
accept_prob,
"connect: near-terminus acceptance (also forwarding)"
);
}
}
}
}
}
}
}
let joiner_known = match KnownPeerKeyLocation::try_from(&self.request.joiner) {
Ok(known) => Some(known),
Err(e) => {
tracing::error!(
joiner_pub_key = %e.pub_key,
upstream_addr = %self.upstream_addr,
"INTERNAL ERROR: joiner has unknown address at acceptance check - first relay should have filled this in"
);
None
}
};
if is_terminus
&& !self.accepted_locally
&& self.forwarded_to.is_none()
&& joiner_known.as_ref().is_some_and(|j| ctx.should_accept(j))
{
self.accepted_locally = true;
let self_loc = ctx.self_location();
let acceptor = PeerKeyLocation::with_unknown_addr(self_loc.pub_key().clone());
let dist = ring_distance(self_loc.location(), self.request.joiner.location());
actions.accept = Some(AcceptOutcome {
response: ConnectResponse { acceptor },
joiner: self.request.joiner.clone(),
});
tracing::info!(
acceptor_pub_key = %self_loc.pub_key(),
joiner_pub_key = %self.request.joiner.pub_key(),
acceptor_loc = ?self_loc.location(),
joiner_loc = ?self.request.joiner.location(),
ring_distance = ?dist,
"connect: acceptance issued at terminus (acceptor addr will be filled by relay)"
);
} else if is_terminus && !self.accepted_locally && self.forwarded_to.is_none() {
if self.request.uphill_budget > 0 && self.request.ttl >= 2 {
let uphill_hop = ctx.select_uphill_hop(
self.request.desired_location,
&self.request.visited,
recency,
);
if let Some(uphill_peer) = uphill_hop {
let dist =
ring_distance(uphill_peer.location(), Some(self.request.desired_location));
tracing::debug!(
target = %self.request.desired_location,
ttl = self.request.ttl,
uphill_budget = self.request.uphill_budget,
uphill_peer = %uphill_peer.pub_key(),
uphill_loc = ?uphill_peer.location(),
ring_distance_to_target = ?dist,
"connect: at terminus but cannot accept, routing uphill"
);
let (peer, mut fwd_req) =
self.forward_to_peer(ctx, uphill_peer, forward_attempts, now);
fwd_req.uphill_budget = fwd_req.uphill_budget.saturating_sub(1);
self.request.uphill_budget = self.request.uphill_budget.saturating_sub(1);
actions.forward = Some((peer, fwd_req));
} else {
tracing::info!(
target = %self.request.desired_location,
ttl = self.request.ttl,
uphill_budget = self.request.uphill_budget,
visited = ?self.request.visited,
"connect: at terminus, cannot accept, no uphill peers available — rejecting"
);
actions.rejected = true;
}
} else {
tracing::info!(
target = %self.request.desired_location,
ttl = self.request.ttl,
uphill_budget = self.request.uphill_budget,
visited = ?self.request.visited,
"connect: at terminus, cannot accept, uphill budget or TTL exhausted — rejecting"
);
actions.rejected = true;
}
} else if is_terminus && self.forwarded_to.is_some() {
tracing::debug!(
target = %self.request.desired_location,
forwarded_to = ?self.forwarded_to,
"connect: at terminus but already forwarded - waiting for response"
);
}
actions
}
}
pub(crate) struct RelayEnv<'a> {
pub op_manager: &'a OpManager,
self_location: PeerKeyLocation,
}
impl<'a> RelayEnv<'a> {
pub fn new(op_manager: &'a OpManager) -> Self {
let self_location = op_manager.ring.connection_manager.own_location();
Self {
op_manager,
self_location,
}
}
}
impl RelayContext for RelayEnv<'_> {
fn self_location(&self) -> &PeerKeyLocation {
&self.self_location
}
fn should_accept(&self, joiner: &KnownPeerKeyLocation) -> bool {
let addr = joiner.socket_addr();
let location = joiner.location();
if let Some(blocked) = &self.op_manager.blocked_addresses {
if blocked.contains(&addr) {
tracing::info!(
joiner_addr = %addr,
"connect: rejecting join from blocked peer at routing level"
);
return false;
}
}
self.op_manager
.ring
.connection_manager
.should_accept(location, addr)
}
fn select_next_hop(
&self,
desired_location: Location,
visited: &VisitedPeers,
recency: &HashMap<PeerKeyLocation, Instant>,
estimator: &ConnectForwardEstimator,
) -> Option<PeerKeyLocation> {
let now = Instant::now();
let skip = SkipListWithSelf {
visited,
self_addr: self.self_location.socket_addr(),
};
let router = self.op_manager.ring.router.read();
let candidates = self.op_manager.ring.connection_manager.routing_candidates(
desired_location,
None,
skip,
false, );
let my_distance = self
.self_location
.location()
.map(|loc| loc.distance(desired_location));
let mut scored: Vec<(f64, PeerKeyLocation)> = Vec::with_capacity(candidates.len());
let mut fallbacks: Vec<PeerKeyLocation> = Vec::with_capacity(candidates.len());
let conn_mgr = &self.op_manager.ring.connection_manager;
for cand in candidates {
if let Some(ts) = recency.get(&cand) {
if now.duration_since(*ts) < RECENCY_COOLDOWN {
continue;
}
}
if let (Some(cand_loc), Some(my_dist)) = (cand.location(), my_distance) {
if cand_loc.distance(desired_location) > my_dist {
continue;
}
}
let reliability = cand
.socket_addr()
.map(|addr| conn_mgr.peer_acceptor_reliability(addr, now))
.unwrap_or(0.5);
if cand.location().is_some() {
if let Some(estimator_score) = estimator.estimate(&cand, desired_location) {
let combined_score = estimator_score * reliability;
scored.push((combined_score, cand.clone()));
continue;
}
fallbacks.push(cand.clone());
continue;
}
fallbacks.push(cand.clone());
}
filter_low_score_peers(&mut scored, fallbacks.len());
if !scored.is_empty() && fallbacks.is_empty() {
let all_unreliable = scored.iter().all(|(_, cand)| {
cand.socket_addr()
.map(|addr| conn_mgr.peer_acceptor_reliability(addr, now))
.unwrap_or(0.5)
< MIN_FORWARD_SCORE
});
if all_unreliable {
tracing::debug!(
candidates = scored.len(),
"connect: all closer peers have low acceptor reliability, escalating to uphill"
);
return None;
}
}
if !scored.is_empty() {
let best_score = scored
.iter()
.map(|(s, _)| *s)
.max_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap_or(0.0);
let best: Vec<_> = scored
.into_iter()
.filter(|(s, _)| (*s - best_score).abs() < f64::EPSILON)
.map(|(_, c)| c)
.collect();
if !best.is_empty() {
return router.select_peer(best.iter(), desired_location).cloned();
}
}
if fallbacks.is_empty() {
None
} else {
router
.select_peer(fallbacks.iter(), desired_location)
.cloned()
}
}
fn select_uphill_hop(
&self,
desired_location: Location,
visited: &VisitedPeers,
recency: &HashMap<PeerKeyLocation, Instant>,
) -> Option<PeerKeyLocation> {
let now = Instant::now();
let skip = SkipListWithSelf {
visited,
self_addr: self.self_location.socket_addr(),
};
let router = self.op_manager.ring.router.read();
let conn_mgr = &self.op_manager.ring.connection_manager;
let candidates = conn_mgr.routing_candidates(
desired_location,
None,
skip,
false, );
let mut best_reliability = 0.0_f64;
let scored: Vec<(f64, PeerKeyLocation)> = candidates
.into_iter()
.filter_map(|cand| {
if let Some(ts) = recency.get(&cand) {
if now.duration_since(*ts) < RECENCY_COOLDOWN {
return None;
}
}
let reliability = cand
.socket_addr()
.map(|addr| conn_mgr.peer_acceptor_reliability(addr, now))
.unwrap_or(0.5);
best_reliability = best_reliability.max(reliability);
Some((reliability, cand))
})
.collect();
if scored.is_empty() {
tracing::debug!(
target = %desired_location,
"connect: no uphill candidates available"
);
return None;
}
let close_threshold = NEAR_TERMINUS_DISTANCE * 2.0;
let (close, far): (Vec<_>, Vec<_>) = scored
.into_iter()
.filter(|(r, _)| best_reliability - *r < UPHILL_RELIABILITY_TOLERANCE)
.map(|(_, c)| c)
.partition(|cand: &PeerKeyLocation| {
cand.location()
.map(|loc| loc.distance(desired_location).as_f64() < close_threshold)
.unwrap_or(false)
});
if !close.is_empty() {
tracing::debug!(
target = %desired_location,
close_count = close.len(),
far_count = far.len(),
"connect: preferring close uphill peer"
);
router.select_peer(close.iter(), desired_location).cloned()
} else {
router.select_peer(far.iter(), desired_location).cloned()
}
}
}
#[derive(Debug)]
#[allow(dead_code)] pub struct AcceptedPeer {
pub peer: PeerKeyLocation,
}
#[derive(Debug, Default)]
#[allow(dead_code)] pub struct JoinerAcceptance {
pub new_acceptor: Option<AcceptedPeer>,
pub satisfied: bool,
pub assigned_location: bool,
}
#[allow(dead_code)] impl JoinerState {
pub(crate) fn register_acceptance(
&mut self,
response: &ConnectResponse,
now: Instant,
) -> JoinerAcceptance {
let mut acceptance = JoinerAcceptance::default();
if self.accepted.insert(response.acceptor.clone()) {
self.last_progress = now;
acceptance.new_acceptor = Some(AcceptedPeer {
peer: response.acceptor.clone(),
});
acceptance.assigned_location = self.accepted.len() == 1;
}
acceptance.satisfied = self.accepted.len() >= self.target_connections;
acceptance
}
pub(crate) fn update_observed_address(&mut self, address: SocketAddr, now: Instant) {
self.observed_address = Some(address);
self.last_progress = now;
}
}
#[cfg(test)]
#[derive(Debug, Clone)]
pub(crate) struct ConnectOp {
pub(crate) id: Transaction,
pub(crate) state: Option<ConnectState>,
recency: HashMap<PeerKeyLocation, Instant>,
forward_attempts: HashMap<PeerKeyLocation, ForwardAttempt>,
connect_forward_estimator: Arc<RwLock<ConnectForwardEstimator>>,
}
#[cfg(test)]
impl ConnectOp {
fn record_forward_outcome(&mut self, peer: &PeerKeyLocation, desired: Location, success: bool) {
self.forward_attempts.remove(peer);
if !success {
tracing::debug!(
peer = %peer.pub_key,
desired = %desired,
"connect forward failed, recording failure in estimator"
);
}
self.connect_forward_estimator
.write()
.record(peer, desired, success);
}
pub(crate) fn expire_forward_attempts(&mut self, now: Instant) {
let mut expired = Vec::with_capacity(self.forward_attempts.len());
for (peer, attempt) in self.forward_attempts.iter() {
if now.duration_since(attempt.sent_at) >= FORWARD_ATTEMPT_TIMEOUT {
expired.push((peer.clone(), attempt.desired));
}
}
expired.sort_by(|a, b| a.0.pub_key.cmp(&b.0.pub_key));
for (peer, desired) in expired {
if let Some(attempt) = self.forward_attempts.remove(&peer) {
self.record_forward_outcome(&attempt.peer, desired, false);
}
}
}
pub(crate) fn new_joiner(
id: Transaction,
target_connections: usize,
observed_address: Option<SocketAddr>,
connect_forward_estimator: Arc<RwLock<ConnectForwardEstimator>>,
) -> Self {
let started_without_address = observed_address.is_none();
let state = ConnectState::WaitingForResponses(JoinerState {
target_connections,
observed_address,
accepted: HashSet::new(),
last_progress: Instant::now(),
started_without_address,
});
Self {
id,
state: Some(state),
recency: HashMap::new(),
forward_attempts: HashMap::new(),
connect_forward_estimator,
}
}
pub(crate) fn new_relay(
id: Transaction,
upstream_addr: SocketAddr,
mut request: ConnectRequest,
connect_forward_estimator: Arc<RwLock<ConnectForwardEstimator>>,
) -> Self {
request.visited = request.visited.with_transaction(&id);
request.uphill_budget = request.uphill_budget.min(DEFAULT_UPHILL_BUDGET);
let state = ConnectState::Relaying(Box::new(RelayState {
upstream_addr,
request,
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
}));
Self {
id,
state: Some(state),
recency: HashMap::new(),
forward_attempts: HashMap::new(),
connect_forward_estimator,
}
}
pub(crate) fn initiate_join_request(
own: PeerKeyLocation,
target: PeerKeyLocation,
desired_location: Location,
ttl: u8,
target_connections: usize,
connect_forward_estimator: Arc<RwLock<ConnectForwardEstimator>>,
exclude_addrs: &[SocketAddr],
) -> (Transaction, Self, ConnectMsg) {
let tx = Transaction::new::<ConnectMsg>();
let msg = prepare_join_request(tx, &own, &target, desired_location, ttl, exclude_addrs);
let op = ConnectOp::new_joiner(
tx,
target_connections,
own.socket_addr(),
connect_forward_estimator,
);
(tx, op, msg)
}
pub(crate) fn handle_request<C: RelayContext>(
&mut self,
ctx: &C,
upstream_addr: SocketAddr,
request: ConnectRequest,
estimator: &ConnectForwardEstimator,
now: Instant,
) -> RelayActions {
self.expire_forward_attempts(now);
if !matches!(self.state, Some(ConnectState::Relaying(_))) {
self.state = Some(ConnectState::Relaying(Box::new(RelayState {
upstream_addr,
request: request.clone(),
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
})));
}
match self.state.as_mut() {
Some(ConnectState::Relaying(state)) => {
state.upstream_addr = upstream_addr;
let mut request = request;
request.visited = request.visited.with_transaction(&self.id);
state.request = request;
state.handle_request(
ctx,
&self.recency,
&mut self.forward_attempts,
estimator,
now,
)
}
_ => RelayActions::default(),
}
}
}
struct SkipListWithSelf<'a> {
visited: &'a VisitedPeers,
self_addr: Option<SocketAddr>,
}
impl Contains<SocketAddr> for SkipListWithSelf<'_> {
fn has_element(&self, target: SocketAddr) -> bool {
if let Some(self_addr) = self.self_addr {
if target == self_addr {
return true;
}
}
self.visited.probably_visited(target)
}
}
impl Contains<&SocketAddr> for SkipListWithSelf<'_> {
fn has_element(&self, target: &SocketAddr) -> bool {
self.has_element(*target)
}
}
fn ring_distance(a: Option<Location>, b: Option<Location>) -> Option<f64> {
match (a, b) {
(Some(a), Some(b)) => Some(a.distance(b).as_f64()),
_ => None,
}
}
pub(super) async fn dispatch_expect_connection_from(
op_manager: &OpManager,
tx: Transaction,
peer: PeerKeyLocation,
) -> Result<(), OpError> {
let Some(addr) = peer.socket_addr() else {
tracing::error!(
tx = %tx,
joiner_pub_key = %peer.pub_key(),
"INTERNAL ERROR: acceptor reached dispatch_expect_connection_from with unknown joiner address — handle_request acceptance invariant violated"
);
return Ok(());
};
tracing::info!(
joiner_addr = %addr,
tx = %tx,
"connect: acceptor accepted joiner, initiating hole punch"
);
op_manager
.notify_node_event(NodeEvent::ExpectPeerConnection { addr })
.await?;
let (callback, mut rx) = mpsc::channel(1);
op_manager
.notify_node_event(NodeEvent::ConnectPeer {
peer: peer.clone(),
tx,
callback,
is_gw: false,
})
.await?;
GlobalExecutor::spawn(async move {
if let Some(result) = rx.recv().await {
match result {
Ok((connected_peer, _)) => {
tracing::info!(
%connected_peer,
tx=%tx,
"connect: acceptor hole-punch connection succeeded"
);
}
Err(_) => {
tracing::debug!(
%peer,
tx=%tx,
"connect: acceptor hole-punch connection failed (joiner may connect to us instead)"
);
}
}
}
});
Ok(())
}
pub(crate) fn prepare_join_request(
tx: Transaction,
own: &PeerKeyLocation,
target: &PeerKeyLocation,
desired_location: Location,
ttl: u8,
exclude_addrs: &[SocketAddr],
) -> ConnectMsg {
let mut visited = VisitedPeers::new(&tx);
if let Some(own_addr) = own.socket_addr() {
visited.mark_visited(own_addr);
}
if let Some(target_addr) = target.socket_addr() {
visited.mark_visited(target_addr);
}
for addr in exclude_addrs {
visited.mark_visited(*addr);
}
if !exclude_addrs.is_empty() {
tracing::debug!(
excluded_addrs = exclude_addrs.len(),
"connect: pre-populated visited filter with excluded peer addresses (failed + connected)"
);
}
let joiner = PeerKeyLocation::with_unknown_addr(own.pub_key.clone());
let request = ConnectRequest {
desired_location,
joiner,
ttl,
visited,
uphill_budget: DEFAULT_UPHILL_BUDGET,
};
ConnectMsg::Request {
id: tx,
payload: request,
}
}
#[tracing::instrument(fields(peer = %op_manager.ring.connection_manager.pub_key), skip_all)]
pub(crate) async fn join_ring_request(
gateway: &PeerKeyLocation,
op_manager: &OpManager,
overall_timeout: Option<std::time::Duration>,
) -> Result<(), OpError> {
use crate::node::ConnectionError;
let gateway_location = gateway.location().ok_or_else(|| {
tracing::error!(
phase = "error",
"Gateway location not found, this should not be possible, report an error"
);
OpError::ConnError(ConnectionError::LocationUnknown)
})?;
let gateway_addr = gateway.socket_addr().ok_or_else(|| {
tracing::error!(phase = "error", "Gateway address not found");
OpError::ConnError(ConnectionError::LocationUnknown)
})?;
if !op_manager
.ring
.connection_manager
.should_accept(gateway_location, gateway_addr)
{
return Err(OpError::ConnError(ConnectionError::UnwantedConnection));
}
let own = op_manager.ring.connection_manager.own_location();
let base_location = own.location().unwrap_or_else(Location::random);
let now = Instant::now();
let current_connections = op_manager.ring.connection_manager.connection_count();
let apply_bootstrap_jitter = |base: Location| -> Location {
let failures = op_manager
.ring
.connection_manager
.increment_connect_jitter_failures(now);
if failures > 1 {
let magnitude = (0.05 * (1u32 << (failures - 2).min(3)) as f64).min(0.25);
let uniform_01 = (GlobalRng::random_u64() as f64) / (u64::MAX as f64);
let offset = magnitude * (2.0 * uniform_01 - 1.0);
let jittered = (base.as_f64() + offset).rem_euclid(1.0);
tracing::info!(
failures,
magnitude,
base = %base,
jittered,
"connect: applying location jitter after consecutive failures"
);
Location::new(jittered)
} else {
base
}
};
const GAP_TARGET_THRESHOLD: usize = 3;
let desired_location = if current_connections >= GAP_TARGET_THRESHOLD {
if let Some(my_loc) = own.location() {
let distances: Vec<f64> = op_manager
.ring
.connection_manager
.location_for_all_peers()
.into_iter()
.map(|peer_loc| my_loc.distance(peer_loc).as_f64())
.collect();
if distances.len() >= GAP_TARGET_THRESHOLD {
let target =
crate::topology::small_world_rand::gap_target(my_loc, distances.into_iter());
tracing::info!(
current_connections,
own_location = %my_loc,
target = %target,
distance = my_loc.distance(target).as_f64(),
"connect: using gap_target for Kleinberg-optimal placement"
);
target
} else {
apply_bootstrap_jitter(base_location)
}
} else {
apply_bootstrap_jitter(base_location)
}
} else {
apply_bootstrap_jitter(base_location)
};
let tx = Transaction::new::<ConnectMsg>();
op_ctx_task::start_client_connect(
tx,
gateway.clone(),
gateway_addr,
op_manager,
own,
desired_location,
overall_timeout,
)
.await
}
pub(crate) fn resolve_probe_gateway_addr(
gateway: &PeerKeyLocation,
) -> Result<std::net::SocketAddr, OpError> {
use crate::node::ConnectionError;
gateway.socket_addr().ok_or_else(|| {
tracing::error!(phase = "error", "Gateway address not found");
OpError::ConnError(ConnectionError::LocationUnknown)
})
}
#[tracing::instrument(fields(peer = %op_manager.ring.connection_manager.pub_key), skip_all)]
pub(crate) async fn gateway_version_probe(
gateway: &PeerKeyLocation,
op_manager: &OpManager,
) -> Result<(), OpError> {
let gateway_addr = resolve_probe_gateway_addr(gateway)?;
let own = op_manager.ring.connection_manager.own_location();
let desired_location = own.location().unwrap_or_else(Location::random);
let tx = Transaction::new::<ConnectMsg>();
op_ctx_task::start_client_connect(
tx,
gateway.clone(),
gateway_addr,
op_manager,
own,
desired_location,
None,
)
.await
}
pub(crate) const GATEWAY_BACKOFF_POLL_CAP: Duration = Duration::from_secs(30);
pub(crate) async fn initial_join_procedure(
op_manager: Arc<OpManager>,
gateways: &[PeerKeyLocation],
) -> Result<JoinHandle<()>, OpError> {
let number_of_parallel_connections = {
let max_potential_conns_per_gw = op_manager.ring.max_hops_to_live;
let needed_to_cover_max =
op_manager.ring.connection_manager.max_connections / max_potential_conns_per_gw;
gateways.iter().take(needed_to_cover_max).count().max(2)
};
let gateways = gateways.to_vec();
let handle = GlobalExecutor::spawn(async move {
if gateways.is_empty() {
tracing::warn!(
phase = "error",
"No gateways available, aborting join procedure"
);
return;
}
const BASE_WAIT_SECS: u64 = 1;
const LONG_WAIT_SECS: u64 = 30;
const CACHED_PEER_TIMEOUT: Duration = Duration::from_secs(5);
let bootstrap_threshold = op_manager.ring.connection_manager.min_connections;
tracing::info!(
bootstrap_threshold,
"Starting initial join procedure with {} gateways",
gateways.len()
);
if let Some(ref cache_dir) = op_manager.ring.peer_cache_dir {
let cache = crate::ring::peer_cache::PeerCache::load(
cache_dir,
op_manager.ring.time_source.as_ref(),
);
if !cache.peers.is_empty() {
tracing::info!(
cached_peers = cache.peers.len(),
"Attempting fast reconnection to cached peers"
);
let cached_peer_locs: Vec<PeerKeyLocation> = cache
.peers
.iter()
.take(number_of_parallel_connections)
.map(|cp| PeerKeyLocation::new(cp.pub_key.clone(), cp.addr))
.collect();
let mut cached_futs: FuturesUnordered<_> = cached_peer_locs
.iter()
.map(|peer| {
let op_mgr = op_manager.clone();
let peer = peer.clone();
GlobalExecutor::spawn(async move {
match join_ring_request(&peer, &op_mgr, Some(CACHED_PEER_TIMEOUT)).await
{
Ok(()) => {
tracing::info!(peer = %peer, "Reconnected to cached peer");
true
}
Err(e) => {
tracing::debug!(
peer = %peer, error = %e,
"Cached peer reconnection failed"
);
false
}
}
})
})
.collect();
let mut successes = 0usize;
while let Some(result) = cached_futs.next().await {
if matches!(result, Ok(true)) {
successes += 1;
}
}
tracing::info!(
successes,
attempted = cached_peer_locs.len(),
"Cached peer reconnection complete"
);
}
}
loop {
let open_conns = op_manager.ring.open_connections();
let unconnected_gateways: Vec<_> =
op_manager.ring.is_not_connected(gateways.iter()).collect();
tracing::debug!(
"Connection status: open_connections = {}, unconnected_gateways = {}",
open_conns,
unconnected_gateways.len()
);
let unconnected_count = unconnected_gateways.len();
if open_conns == 0 && unconnected_count == 0 {
let gateway_addrs: Vec<_> =
gateways.iter().filter_map(|gw| gw.socket_addr()).collect();
op_manager
.ring
.connection_manager
.clear_pending_reservations_for(&gateway_addrs);
tracing::warn!(
total_gateways = gateways.len(),
"Fully isolated: cleared stale gateway reservations to unblock recovery"
);
} else if open_conns < bootstrap_threshold && unconnected_count == 0 {
tracing::info!(
open_conns,
total_gateways = gateways.len(),
"Below bootstrap threshold but all gateways appear connected/pending — \
waiting for handshakes to complete or pending reservations to expire"
);
}
let use_connected_as_routers = open_conns < bootstrap_threshold
&& unconnected_count == 0
&& bootstrap_threshold.saturating_sub(open_conns) > gateways.len();
if open_conns < bootstrap_threshold && unconnected_count > 0 {
let (eligible_gateways, min_backoff) = {
let backoff = op_manager.gateway_backoff.lock();
let mut min_remaining: Option<Duration> = None;
let eligible: Vec<_> = unconnected_gateways
.into_iter()
.filter(|gw| {
if let Some(addr) = gw.socket_addr() {
if backoff.is_in_backoff(addr) {
if let Some(remaining) = backoff.remaining_backoff(addr) {
min_remaining = Some(
min_remaining
.map(|m| m.min(remaining))
.unwrap_or(remaining),
);
}
tracing::debug!(
gateway = %gw,
"Skipping gateway due to backoff from previous failures"
);
return false;
}
}
true
})
.collect();
(eligible, min_remaining)
};
let eligible_count = eligible_gateways.len();
if eligible_count == 0 {
if let Some(min_wait) = min_backoff {
let effective_wait = if open_conns > 0 {
let jitter_ms = GlobalRng::random_range(
0u64..(GATEWAY_BACKOFF_POLL_CAP.as_millis() / 5) as u64,
);
let cap = GATEWAY_BACKOFF_POLL_CAP.mul_f64(0.8)
+ Duration::from_millis(jitter_ms);
min_wait.min(cap)
} else {
min_wait
};
tracing::info!(
wait_secs = effective_wait.as_secs(),
gateway_backoff_secs = min_wait.as_secs(),
total_gateways = unconnected_count,
open_connections = open_conns,
"All gateways in backoff, waiting before retry"
);
tokio::select! {
_ = tokio::time::sleep(effective_wait) => {},
_ = op_manager.gateway_backoff_cleared.notified() => {
tracing::info!("Gateway backoff cleared externally, retrying immediately");
},
}
continue;
}
}
tracing::info!(
"Below bootstrap threshold ({} < {}), attempting to connect to {} gateways (skipped {} in backoff)",
open_conns,
bootstrap_threshold,
number_of_parallel_connections.min(eligible_count),
unconnected_count - eligible_count
);
let select_all = FuturesUnordered::new();
for gateway in eligible_gateways
.into_iter()
.shuffle()
.take(number_of_parallel_connections)
{
tracing::info!(%gateway, "Attempting connection to gateway");
let op_manager = op_manager.clone();
select_all.push(async move {
(join_ring_request(gateway, &op_manager, None).await, gateway)
});
}
select_all
.for_each(|(res, gateway)| async move {
if let Err(error) = res {
if !matches!(
error,
OpError::ConnError(
crate::node::ConnectionError::UnwantedConnection
)
) {
tracing::error!(
%gateway,
%error,
"Failed while attempting connection to gateway"
);
}
}
})
.await;
} else if use_connected_as_routers {
let eligible: Vec<_> = {
let backoff = op_manager.gateway_backoff.lock();
gateways
.iter()
.filter(|gw| {
gw.socket_addr()
.map(|addr| !backoff.is_in_backoff(addr))
.unwrap_or(false)
})
.collect()
};
if !eligible.is_empty() {
tracing::info!(
eligible = eligible.len(),
total_gateways = gateways.len(),
"Routing CONNECTs through connected gateways"
);
let select_all = FuturesUnordered::new();
for gateway in eligible
.into_iter()
.shuffle()
.take(number_of_parallel_connections)
{
let gateway = gateway.clone();
let op_manager = op_manager.clone();
select_all.push(async move {
(
join_ring_request(&gateway, &op_manager, None).await,
gateway,
)
});
}
select_all
.for_each(|(res, gateway)| async move {
if let Err(error) = res {
if !matches!(
error,
OpError::ConnError(
crate::node::ConnectionError::UnwantedConnection
)
) {
tracing::error!(
%gateway,
%error,
"Failed while routing CONNECT through gateway"
);
}
}
})
.await;
}
} else if open_conns >= bootstrap_threshold {
tracing::trace!(
"Have {} connections (>= threshold of {}), not attempting gateway connections",
open_conns,
bootstrap_threshold
);
}
let jitter_ms = GlobalRng::random_u64() % 2000; let base_wait_ms = if open_conns == 0 {
tracing::debug!(
"No connections yet, waiting ~{}s before retry",
BASE_WAIT_SECS
);
BASE_WAIT_SECS * 1000
} else if open_conns < bootstrap_threshold {
let wait = BASE_WAIT_SECS * 3 * 1000;
tracing::debug!(
"Have {} connections (below threshold of {}), waiting ~{}ms",
open_conns,
bootstrap_threshold,
wait + jitter_ms
);
wait
} else {
tracing::trace!(
"Connection pool healthy ({} connections), waiting ~{}s",
open_conns,
LONG_WAIT_SECS
);
LONG_WAIT_SECS * 1000
};
tokio::time::sleep(Duration::from_millis(base_wait_ms + jitter_ms)).await;
}
});
Ok(handle)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport::TransportKeypair;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
#[test]
fn resolve_probe_gateway_addr_returns_known_socket() {
let pub_key = TransportKeypair::new().public().clone();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345);
let gateway = PeerKeyLocation::new(pub_key, addr);
let resolved = resolve_probe_gateway_addr(&gateway).expect("known address resolves");
assert_eq!(resolved, addr);
}
#[test]
fn resolve_probe_gateway_addr_errors_on_unknown_address() {
let pub_key = TransportKeypair::new().public().clone();
let gateway = PeerKeyLocation::with_unknown_addr(pub_key);
let err =
resolve_probe_gateway_addr(&gateway).expect_err("unknown address must fail to resolve");
assert!(
matches!(
err,
OpError::ConnError(crate::node::ConnectionError::LocationUnknown)
),
"expected LocationUnknown, got {err:?}"
);
}
struct TestRelayContext {
self_loc: PeerKeyLocation,
accept: bool,
next_hop: Option<PeerKeyLocation>,
uphill_hop: Option<PeerKeyLocation>,
}
impl TestRelayContext {
fn new(self_loc: PeerKeyLocation) -> Self {
Self {
self_loc,
accept: true,
next_hop: None,
uphill_hop: None,
}
}
fn accept(mut self, accept: bool) -> Self {
self.accept = accept;
self
}
fn next_hop(mut self, hop: Option<PeerKeyLocation>) -> Self {
self.next_hop = hop;
self
}
fn uphill_hop(mut self, hop: Option<PeerKeyLocation>) -> Self {
self.uphill_hop = hop;
self
}
}
impl RelayContext for TestRelayContext {
fn self_location(&self) -> &PeerKeyLocation {
&self.self_loc
}
fn should_accept(&self, _joiner: &KnownPeerKeyLocation) -> bool {
self.accept
}
fn select_next_hop(
&self,
_desired_location: Location,
_visited: &VisitedPeers,
_recency: &HashMap<PeerKeyLocation, Instant>,
_estimator: &ConnectForwardEstimator,
) -> Option<PeerKeyLocation> {
self.next_hop.clone()
}
fn select_uphill_hop(
&self,
_desired_location: Location,
_visited: &VisitedPeers,
_recency: &HashMap<PeerKeyLocation, Instant>,
) -> Option<PeerKeyLocation> {
self.uphill_hop.clone()
}
}
fn make_peer(port: u16) -> PeerKeyLocation {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
let keypair = TransportKeypair::new();
PeerKeyLocation::new(keypair.public().clone(), addr)
}
#[test]
fn forward_estimator_handles_missing_location() {
let mut estimator = ConnectForwardEstimator::new();
let key = TransportKeypair::new();
let peer = PeerKeyLocation::new(key.public().clone(), "127.0.0.1:1111".parse().unwrap());
estimator.record(&peer, Location::new(0.25), true);
}
#[test]
fn expired_forward_attempts_are_cleared() {
let mut op = ConnectOp::new_joiner(
Transaction::new::<ConnectMsg>(),
1,
None,
Arc::new(RwLock::new(ConnectForwardEstimator::new())),
);
let peer = make_peer(2000);
op.forward_attempts.insert(
peer.clone(),
ForwardAttempt {
peer: peer.clone(),
desired: Location::new(0.2),
sent_at: Instant::now() - FORWARD_ATTEMPT_TIMEOUT - Duration::from_secs(1),
},
);
op.expire_forward_attempts(Instant::now());
assert!(op.forward_attempts.is_empty());
}
#[test]
fn expired_forward_attempts_record_failures_in_estimator() {
let estimator = Arc::new(RwLock::new(ConnectForwardEstimator::new()));
let mut op =
ConnectOp::new_joiner(Transaction::new::<ConnectMsg>(), 1, None, estimator.clone());
let peer = make_peer(2000);
op.forward_attempts.insert(
peer.clone(),
ForwardAttempt {
peer: peer.clone(),
desired: Location::new(0.2),
sent_at: Instant::now() - FORWARD_ATTEMPT_TIMEOUT - Duration::from_secs(1),
},
);
let (_, _, events_before, _) = estimator.read().snapshot();
op.expire_forward_attempts(Instant::now());
let (_, _, events_after, _) = estimator.read().snapshot();
assert!(op.forward_attempts.is_empty());
assert!(
events_after > events_before,
"estimator should have recorded failure; before={events_before}, after={events_after}"
);
}
#[test]
fn relay_accepts_when_policy_allows() {
let self_loc = make_peer(4000);
let joiner = make_peer(5000);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc.clone());
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
let accept = actions.accept.expect("expected acceptance");
assert_eq!(accept.response.acceptor.pub_key(), self_loc.pub_key());
assert!(
accept.response.acceptor.peer_addr.is_unknown(),
"ConnectResponse acceptor should have Unknown address for NAT traversal"
);
assert_eq!(accept.joiner.pub_key(), joiner.pub_key());
assert!(actions.forward.is_none());
}
#[test]
fn connect_response_acceptor_starts_with_unknown_address() {
let self_loc = make_peer(4001);
let joiner = make_peer(5001);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc.clone());
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
let accept = actions.accept.expect("expected acceptance");
assert!(
accept.response.acceptor.peer_addr.is_unknown(),
"ConnectResponse.acceptor must have Unknown address for NAT traversal"
);
assert_eq!(
accept.response.acceptor.pub_key(),
self_loc.pub_key(),
"acceptor pub_key should come from self_location"
);
}
#[test]
fn relay_forwards_when_not_accepting() {
let self_loc = make_peer(4100);
let joiner = make_peer(5100);
let next_hop = make_peer(6100);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 2,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc)
.accept(false)
.next_hop(Some(next_hop.clone()));
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(actions.accept.is_none());
let (forward_to, request) = actions.forward.expect("expected forward");
assert_eq!(forward_to.pub_key(), next_hop.pub_key());
assert_eq!(request.ttl, 1);
assert!(
request
.visited
.probably_visited(joiner.socket_addr().expect("test peer must have address"))
);
}
#[test]
fn relay_does_not_accept_when_not_at_terminus() {
let self_loc = make_peer(4150);
let joiner = make_peer(5150);
let next_hop = make_peer(6150);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: next_hop.location().expect("test peer has location"),
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc)
.accept(true) .next_hop(Some(next_hop.clone())); let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
actions.accept.is_none(),
"relay should NOT accept when not at terminus, even if should_accept() is true"
);
let (forward_to, _) = actions
.forward
.expect("relay should forward when not at terminus");
assert_eq!(forward_to.pub_key(), next_hop.pub_key());
}
#[test]
fn relay_emits_observed_address_when_discovering_joiner_addr() {
let self_loc = make_peer(4050);
let joiner_base = make_peer(5050);
let external_nat_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(203, 0, 113, 10)),
joiner_base
.socket_addr()
.expect("test peer must have address")
.port(),
);
let joiner_with_unknown_addr =
PeerKeyLocation::with_unknown_addr(joiner_base.pub_key().clone());
let mut state = RelayState {
upstream_addr: external_nat_addr,
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner_with_unknown_addr.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc);
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
let (target, addr) = actions.observed_address.expect(
"gateway should emit ObservedAddress when discovering joiner's external address",
);
assert_eq!(addr, external_nat_addr);
assert_eq!(
target.socket_addr().expect("target must have address"),
external_nat_addr
);
assert_eq!(
state
.request
.joiner
.socket_addr()
.expect("joiner must have address after discovery"),
external_nat_addr
);
}
#[test]
fn relay_does_not_emit_observed_address_when_joiner_addr_already_known() {
let self_loc = make_peer(4051);
let joiner_base = make_peer(5051);
let known_addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(203, 0, 113, 11)),
joiner_base
.socket_addr()
.expect("test peer must have address")
.port(),
);
let joiner_with_known_addr =
PeerKeyLocation::new(joiner_base.pub_key().clone(), known_addr);
let mut state = RelayState {
upstream_addr: known_addr,
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner_with_known_addr.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc);
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
actions.observed_address.is_none(),
"non-first hop should NOT emit ObservedAddress when joiner addr already known"
);
}
#[test]
fn joiner_tracks_acceptance() {
let acceptor = make_peer(7000);
let mut state = JoinerState {
target_connections: 1,
observed_address: None,
accepted: HashSet::new(),
last_progress: Instant::now(),
started_without_address: true,
};
let response = ConnectResponse {
acceptor: acceptor.clone(),
};
let result = state.register_acceptance(&response, Instant::now());
assert!(result.satisfied);
let new = result.new_acceptor.expect("expected new acceptor");
assert_eq!(new.peer.pub_key(), acceptor.pub_key());
}
#[test]
fn init_join_request_initializes_state() {
let target = make_peer(7200);
let desired = Location::random();
let ttl = 5;
let own = make_peer(7300);
let (tx, op, msg) = ConnectOp::initiate_join_request(
own.clone(),
target.clone(),
desired,
ttl,
2,
Arc::new(RwLock::new(ConnectForwardEstimator::new())),
&[],
);
match msg {
ConnectMsg::Request { payload, .. } => {
assert_eq!(payload.desired_location, desired);
assert_eq!(payload.ttl, ttl);
let visited = payload.visited.with_transaction(&tx);
if let Some(own_addr) = own.socket_addr() {
assert!(visited.probably_visited(own_addr));
}
if let Some(target_addr) = target.socket_addr() {
assert!(visited.probably_visited(target_addr));
}
}
other @ ConnectMsg::Response { .. }
| other @ ConnectMsg::ObservedAddress { .. }
| other @ ConnectMsg::Rejected { .. }
| other @ ConnectMsg::ConnectFailed { .. } => panic!("unexpected message: {other:?}"),
}
assert!(matches!(
op.state,
Some(ConnectState::WaitingForResponses(_))
));
}
#[test]
fn multi_hop_forward_path_tracks_ttl_and_visited_peers() {
let joiner = make_peer(8100);
let relay_a = make_peer(8200);
let relay_b = make_peer(8300);
let joiner_addr = joiner.socket_addr().expect("test peer must have address");
let tx = Transaction::new::<ConnectMsg>();
let mut visited = VisitedPeers::new(&tx);
visited.mark_visited(joiner_addr);
let request = ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited,
uphill_budget: DEFAULT_UPHILL_BUDGET,
};
let mut relay_op = ConnectOp::new_relay(
tx,
joiner_addr,
request.clone(),
Arc::new(RwLock::new(ConnectForwardEstimator::new())),
);
let ctx = TestRelayContext::new(relay_a.clone())
.accept(false)
.next_hop(Some(relay_b.clone()));
let estimator = ConnectForwardEstimator::new();
let actions = relay_op.handle_request(
&ctx,
joiner_addr,
request.clone(),
&estimator,
Instant::now(),
);
let (forward_target, forward_request) = actions
.forward
.expect("relay should forward when it declines to accept");
assert_eq!(forward_target.pub_key(), relay_b.pub_key());
assert_eq!(forward_request.ttl, 2);
let relay_a_addr = relay_a.socket_addr().expect("test peer must have address");
assert!(
forward_request.visited.probably_visited(relay_a_addr),
"forwarded request should record intermediate relay's address"
);
let mut accepting_relay = ConnectOp::new_relay(
tx,
relay_a_addr,
forward_request.clone(),
Arc::new(RwLock::new(ConnectForwardEstimator::new())),
);
let ctx_accept = TestRelayContext::new(relay_b.clone());
let estimator = ConnectForwardEstimator::new();
let accept_actions = accepting_relay.handle_request(
&ctx_accept,
relay_a_addr,
forward_request,
&estimator,
Instant::now(),
);
let accept = accept_actions
.accept
.expect("second relay should accept when policy allows");
assert_eq!(accept.response.acceptor.pub_key(), relay_b.pub_key());
assert_eq!(accept.joiner.pub_key(), joiner.pub_key());
}
#[test]
fn connect_expect_connection_uses_observed_address() {
let private_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 100)), 9000);
let keypair = TransportKeypair::new();
let joiner_original = PeerKeyLocation::new(keypair.public().clone(), private_addr);
let observed_public_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 50)), 9000);
let joiner_with_observed_addr =
PeerKeyLocation::new(keypair.public().clone(), observed_public_addr);
let relay = make_peer(5000);
let mut state = RelayState {
upstream_addr: private_addr, request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner_with_observed_addr.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(relay.clone());
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
let accept = actions
.accept
.expect("expect_connection_from should be set when accepting");
assert_eq!(
accept
.joiner
.socket_addr()
.expect("expect_connection_from must have address"),
observed_public_addr,
"expect_connection_from must use observed external address ({}) not private address ({})",
observed_public_addr,
private_addr
);
assert_eq!(
joiner_original
.socket_addr()
.expect("original joiner must have address"),
private_addr,
"original joiner should have private address"
);
}
#[test]
fn connect_response_acceptor_has_unknown_address() {
let joiner = make_peer(9100);
let acceptor_peer = make_peer(9200);
let joiner_addr = joiner.socket_addr().expect("test peer must have address");
let tx = Transaction::new::<ConnectMsg>();
let mut visited = VisitedPeers::new(&tx);
visited.mark_visited(joiner_addr);
let request = ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited,
uphill_budget: DEFAULT_UPHILL_BUDGET,
};
let mut relay_op = ConnectOp::new_relay(
tx,
joiner_addr,
request.clone(),
Arc::new(RwLock::new(ConnectForwardEstimator::new())),
);
let ctx = TestRelayContext::new(acceptor_peer.clone());
let estimator = ConnectForwardEstimator::new();
let actions = relay_op.handle_request(
&ctx,
joiner_addr,
request.clone(),
&estimator,
Instant::now(),
);
let accept = actions
.accept
.expect("acceptor should issue ConnectResponse");
assert!(
accept.response.acceptor.peer_addr.is_unknown(),
"acceptor address should be Unknown for NAT traversal. Got: {:?}",
accept.response.acceptor.peer_addr
);
assert_eq!(
accept.response.acceptor.pub_key(),
acceptor_peer.pub_key(),
"acceptor pub_key should match"
);
}
#[test]
fn relay_does_not_accept_after_forwarding_on_retry() {
let self_loc = make_peer(4200);
let joiner = make_peer(5200);
let next_hop = make_peer(6200);
let target_loc = next_hop.location().expect("test peer has location");
let self_to_target = self_loc
.location()
.expect("test peer has location")
.distance(target_loc)
.as_f64();
assert!(
self_to_target > NEAR_TERMINUS_DISTANCE,
"test invariant: self_loc must be > NEAR_TERMINUS_DISTANCE ({NEAR_TERMINUS_DISTANCE}) from desired_location, got {self_to_target}"
);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: target_loc,
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc.clone())
.accept(true) .next_hop(Some(next_hop.clone()));
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions1 = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
actions1.accept.is_none(),
"first call should forward, not accept"
);
assert!(
actions1.forward.is_some(),
"first call should produce a forward action"
);
assert!(
state.forwarded_to.is_some(),
"forwarded_to should be set after forwarding"
);
let ctx_retry = TestRelayContext::new(self_loc)
.accept(true) .next_hop(None);
let actions2 = state.handle_request(
&ctx_retry,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
actions2.accept.is_none(),
"second call should NOT accept even though is_terminus=true (already forwarded)"
);
assert!(
actions2.forward.is_none(),
"second call should not forward again"
);
}
#[test]
fn relay_with_forwarded_state_rejects_acceptance_at_terminus() {
let self_loc = make_peer(4250);
let joiner = make_peer(5250);
let prior_next_hop = make_peer(6250);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: prior_next_hop.location().expect("test peer has location"),
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: Some(prior_next_hop),
forwarded_at: Some(Instant::now()),
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc).accept(true).next_hop(None);
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
actions.accept.is_none(),
"relay must not accept once forwarded_to is set, even at terminus"
);
assert!(
actions.forward.is_none(),
"relay must not forward again when forwarded_to is already set"
);
}
#[test]
fn regression_visited_hash_keys_preserved_across_reentry() {
let self_loc = make_peer(4800);
let upstream = make_peer(4801);
let joiner = make_peer(4802);
let marked_peer = make_peer(4803);
let tx = Transaction::new::<ConnectMsg>();
let mut visited = VisitedPeers::new(&tx);
let marked_addr = marked_peer
.socket_addr()
.expect("test peer must have address");
visited.mark_visited(marked_addr);
assert!(
visited.probably_visited(marked_addr),
"precondition: marked peer must be detectable before the wire round-trip"
);
let request = ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited,
uphill_budget: DEFAULT_UPHILL_BUDGET,
};
let estimator_arc = Arc::new(RwLock::new(ConnectForwardEstimator::new()));
let mut op = ConnectOp::new_relay(
tx,
upstream.socket_addr().expect("test peer must have address"),
request.clone(),
estimator_arc,
);
let wire_bytes = bincode::serialize(&request).expect("serialize ConnectRequest");
let reentered: ConnectRequest =
bincode::deserialize(&wire_bytes).expect("deserialize ConnectRequest");
assert!(
!reentered.visited.probably_visited(marked_addr),
"sanity: deserialized bloom must fail to detect marked peer without re-keying (hash_keys are #[serde(skip)])"
);
let ctx = TestRelayContext::new(self_loc.clone())
.accept(false)
.next_hop(None);
let estimator = ConnectForwardEstimator::new();
let _actions = op.handle_request(
&ctx,
upstream.socket_addr().expect("test peer must have address"),
reentered,
&estimator,
Instant::now(),
);
match op.state.as_ref() {
Some(ConnectState::Relaying(state)) => {
assert!(
state.request.visited.probably_visited(marked_addr),
"bloom corruption after re-entry: previously-marked peer no longer detected. \
VisitedPeers::hash_keys were not restored via with_transaction() when \
assigning the deserialized request to state.request."
);
}
other => panic!("expected Relaying state after handle_request, got {other:?}"),
}
}
#[test]
fn terminus_reached_when_no_closer_peers() {
let self_loc = make_peer(4300);
let joiner = make_peer(5300);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc).accept(true).next_hop(None);
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
actions.accept.is_some(),
"should accept at terminus (no closer peers)"
);
assert!(actions.forward.is_none(), "should not forward at terminus");
}
#[test]
fn ttl_zero_prevents_forwarding() {
let self_loc = make_peer(4400);
let joiner = make_peer(5400);
let next_hop = make_peer(6400);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 0, visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc)
.accept(true)
.next_hop(Some(next_hop));
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
actions.forward.is_none(),
"should not forward when TTL is 0"
);
assert!(
actions.accept.is_some(),
"should accept at terminus (TTL exhausted)"
);
}
#[test]
fn uphill_routing_when_at_terminus_but_cannot_accept() {
let self_loc = make_peer(4500);
let joiner = make_peer(5500);
let uphill_peer = make_peer(6500);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc)
.accept(false) .next_hop(None) .uphill_hop(Some(uphill_peer.clone()));
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
actions.accept.is_none(),
"should not accept when should_accept() returns false"
);
let (forward_to, request) = actions
.forward
.expect("should route uphill when at terminus but cannot accept");
assert_eq!(forward_to.pub_key(), uphill_peer.pub_key());
assert_eq!(
request.ttl, 2,
"TTL should be decremented by 1 only (no extra burn): 3 - 1 = 2"
);
assert_eq!(
request.uphill_budget,
DEFAULT_UPHILL_BUDGET - 1,
"uphill_budget should be decremented by 1 on uphill forward"
);
assert!(
state.forwarded_to.is_some(),
"forwarded_to should be set after uphill routing"
);
}
#[test]
fn no_forward_when_at_terminus_cannot_accept_and_no_uphill_peers() {
let self_loc = make_peer(4600);
let joiner = make_peer(5600);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc)
.accept(false) .next_hop(None) .uphill_hop(None);
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
actions.accept.is_none(),
"should not accept when should_accept() returns false"
);
assert!(
actions.forward.is_none(),
"should not forward when no uphill peers available"
);
}
#[test]
fn no_uphill_routing_when_ttl_exhausted() {
let self_loc = make_peer(4700);
let joiner = make_peer(5700);
let uphill_peer = make_peer(6700);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 0, visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc)
.accept(false) .next_hop(None) .uphill_hop(Some(uphill_peer));
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
actions.accept.is_none(),
"should not accept when should_accept() returns false"
);
assert!(
actions.forward.is_none(),
"should not forward uphill when TTL is exhausted"
);
}
#[test]
fn no_uphill_routing_when_budget_exhausted() {
let self_loc = make_peer(4750);
let joiner = make_peer(5750);
let uphill_peer = make_peer(6750);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 5, visited: VisitedPeers::default(),
uphill_budget: 0, },
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc)
.accept(false)
.next_hop(None)
.uphill_hop(Some(uphill_peer));
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
actions.accept.is_none(),
"should not accept when should_accept() returns false"
);
assert!(
actions.forward.is_none(),
"should not forward uphill when budget is exhausted"
);
assert!(actions.rejected, "should reject when budget is exhausted");
}
#[test]
fn test_visited_peers_tracks_addresses() {
let tx = Transaction::new::<ConnectMsg>();
let mut visited = VisitedPeers::new(&tx);
let addr1: SocketAddr = "127.0.0.1:8001".parse().unwrap();
let addr2: SocketAddr = "127.0.0.1:8002".parse().unwrap();
let addr3: SocketAddr = "127.0.0.1:8003".parse().unwrap();
assert!(!visited.probably_visited(addr1));
assert!(!visited.probably_visited(addr2));
assert!(!visited.probably_visited(addr3));
visited.mark_visited(addr1);
visited.mark_visited(addr2);
assert!(visited.probably_visited(addr1));
assert!(visited.probably_visited(addr2));
assert!(!visited.probably_visited(addr3));
}
#[test]
fn test_visited_peers_hash_key_restoration() {
let tx = Transaction::new::<ConnectMsg>();
let mut original = VisitedPeers::new(&tx);
let addr: SocketAddr = "192.168.1.1:9000".parse().unwrap();
original.mark_visited(addr);
let serialized = bincode::serialize(&original).unwrap();
let mut restored: VisitedPeers = bincode::deserialize(&serialized).unwrap();
restored = restored.with_transaction(&tx);
assert!(
restored.probably_visited(addr),
"visited address should be detected after hash key restoration"
);
}
#[test]
fn test_initiate_join_request_uses_bloom_filter() {
let own = make_peer(8001);
let target = make_peer(8002);
let desired_location = Location::try_from(0.5).unwrap();
let estimator = Arc::new(RwLock::new(ConnectForwardEstimator::new()));
let (tx, _op, msg) = ConnectOp::initiate_join_request(
own.clone(),
target.clone(),
desired_location,
10,
3,
estimator,
&[],
);
let request = match msg {
ConnectMsg::Request { payload, .. } => payload,
ConnectMsg::Response { .. }
| ConnectMsg::ObservedAddress { .. }
| ConnectMsg::Rejected { .. }
| ConnectMsg::ConnectFailed { .. } => panic!("Expected ConnectMsg::Request"),
};
let visited = request.visited.with_transaction(&tx);
if let Some(own_addr) = own.socket_addr() {
assert!(
visited.probably_visited(own_addr),
"own address should be marked visited in initial request"
);
}
if let Some(target_addr) = target.socket_addr() {
assert!(
visited.probably_visited(target_addr),
"target address should be marked visited in initial request"
);
}
}
#[test]
fn test_skip_list_with_bloom_filter() {
let tx = Transaction::new::<ConnectMsg>();
let mut visited = VisitedPeers::new(&tx);
let visited_addr: SocketAddr = "10.0.0.1:5000".parse().unwrap();
let self_addr: SocketAddr = "10.0.0.2:5000".parse().unwrap();
let other_addr: SocketAddr = "10.0.0.3:5000".parse().unwrap();
visited.mark_visited(visited_addr);
let skip = SkipListWithSelf {
visited: &visited,
self_addr: Some(self_addr),
};
assert!(
skip.has_element(visited_addr),
"visited address should be in skip list"
);
assert!(
skip.has_element(self_addr),
"self address should be in skip list"
);
assert!(
!skip.has_element(other_addr),
"unvisited address should not be in skip list"
);
}
#[test]
fn relay_rejects_when_no_uphill_peers_available() {
let self_loc = make_peer(4000);
let joiner = make_peer(5000);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc)
.accept(false)
.uphill_hop(None);
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(actions.rejected, "should reject when no uphill peers");
assert!(actions.accept.is_none());
assert!(actions.forward.is_none());
}
#[test]
fn relay_rejects_when_ttl_exhausted() {
let self_loc = make_peer(4000);
let joiner = make_peer(5000);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 0, visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc).accept(false);
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(actions.rejected, "should reject when TTL exhausted");
assert!(actions.accept.is_none());
assert!(actions.forward.is_none());
}
#[test]
fn relay_routes_uphill_when_available() {
let self_loc = make_peer(4000);
let joiner = make_peer(5000);
let uphill = make_peer(6000);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx = TestRelayContext::new(self_loc)
.accept(false)
.uphill_hop(Some(uphill));
let recency = HashMap::new();
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let actions = state.handle_request(
&ctx,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(!actions.rejected, "should not reject when uphill available");
assert!(actions.forward.is_some(), "should forward uphill");
assert!(actions.accept.is_none());
assert!(
state.forwarded_at.is_some(),
"forwarded_at should be set after forwarding"
);
}
#[test]
fn relay_retries_different_uphill_peer_after_rejection() {
let self_loc = make_peer(4000);
let joiner = make_peer(5000);
let peer_a = make_peer(6000);
let peer_b = make_peer(7000);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx_a = TestRelayContext::new(self_loc.clone())
.accept(false)
.uphill_hop(Some(peer_a.clone()));
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let recency = HashMap::new();
let actions = state.handle_request(
&ctx_a,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(actions.forward.is_some(), "should forward to peer_a");
assert_eq!(
state.forwarded_to.as_ref().unwrap().pub_key(),
peer_a.pub_key()
);
state.forwarded_to = None;
state.forwarded_at = None;
let ctx_b = TestRelayContext::new(self_loc)
.accept(false)
.uphill_hop(Some(peer_b.clone()));
let retry_actions = state.handle_request(
&ctx_b,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
retry_actions.forward.is_some(),
"should forward to peer_b on retry"
);
let (fwd_peer, _) = retry_actions.forward.unwrap();
assert_eq!(fwd_peer.pub_key(), peer_b.pub_key());
}
#[test]
fn relay_rejects_when_no_uphill_peers_on_retry() {
let self_loc = make_peer(4000);
let joiner = make_peer(5000);
let peer_a = make_peer(6000);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx_a = TestRelayContext::new(self_loc.clone())
.accept(false)
.uphill_hop(Some(peer_a));
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let recency = HashMap::new();
let actions = state.handle_request(
&ctx_a,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(actions.forward.is_some());
state.forwarded_to = None;
state.forwarded_at = None;
let ctx_none = TestRelayContext::new(self_loc).accept(false);
let retry_actions = state.handle_request(
&ctx_none,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
retry_actions.rejected,
"should reject when no uphill peers on retry"
);
assert!(retry_actions.forward.is_none());
assert!(retry_actions.accept.is_none());
}
#[test]
fn relay_retry_acceptance_bundles_joiner_with_response() {
let self_loc = make_peer(4000);
let joiner = make_peer(5000);
let peer_a = make_peer(6000);
let mut state = RelayState {
upstream_addr: joiner.socket_addr().expect("test peer must have address"),
request: ConnectRequest {
desired_location: Location::random(),
joiner: joiner.clone(),
ttl: 3,
visited: VisitedPeers::default(),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: None,
forwarded_at: None,
observed_sent: false,
accepted_locally: false,
response_forwarded: false,
};
let ctx_a = TestRelayContext::new(self_loc.clone())
.accept(false)
.uphill_hop(Some(peer_a));
let mut forward_attempts = HashMap::new();
let estimator = ConnectForwardEstimator::new();
let recency = HashMap::new();
let first = state.handle_request(
&ctx_a,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
assert!(
first.forward.is_some(),
"initial pass should forward uphill"
);
assert!(
first.accept.is_none(),
"initial pass should not accept (forwarding upstream)"
);
state.forwarded_to = None;
state.forwarded_at = None;
let ctx_retry = TestRelayContext::new(self_loc).accept(true);
let retry = state.handle_request(
&ctx_retry,
&recency,
&mut forward_attempts,
&estimator,
Instant::now(),
);
let accept = retry
.accept
.expect("retry must accept locally when no uphill peers remain");
assert_eq!(
accept.joiner.pub_key(),
joiner.pub_key(),
"AcceptOutcome must carry the joiner alongside the response so the caller \
cannot forget ring promotion (#3838)"
);
assert!(
accept.joiner.socket_addr().is_some(),
"joiner address must be known so the acceptor can emit ConnectPeer / \
ExpectPeerConnection against the observed address"
);
}
#[test]
fn test_filter_low_score_peers_removes_bad_when_alternatives_exist() {
let mut scored: Vec<(f64, &str)> = vec![(0.05, "bad"), (0.8, "good")];
super::filter_low_score_peers(&mut scored, 0);
assert_eq!(scored.len(), 1);
assert_eq!(scored[0].1, "good");
}
#[test]
fn test_filter_low_score_peers_keeps_sole_option() {
let mut scored: Vec<(f64, &str)> = vec![(0.05, "only")];
super::filter_low_score_peers(&mut scored, 0);
assert_eq!(
scored.len(),
1,
"Single peer must be kept even with low score"
);
}
#[test]
fn test_filter_low_score_peers_removes_when_unscored_fallback_exists() {
let mut scored: Vec<(f64, &str)> = vec![(0.05, "low")];
super::filter_low_score_peers(&mut scored, 1);
assert!(
scored.is_empty(),
"Low-score peer should be filtered when unscored fallback exists"
);
}
#[test]
fn test_filter_low_score_peers_keeps_all_bad_when_no_alternatives() {
let mut scored: Vec<(f64, &str)> = vec![(0.05, "bad1"), (0.10, "bad2")];
super::filter_low_score_peers(&mut scored, 0);
assert_eq!(
scored.len(),
2,
"Must keep all low-score peers when no alternatives exist"
);
}
#[test]
fn test_filter_low_score_preserves_all_when_no_alternatives() {
let mut scored = vec![
(0.08, make_peer(7001)), (0.10, make_peer(7002)), (0.12, make_peer(7003)), ];
let fallback_count = 0; filter_low_score_peers(&mut scored, fallback_count);
assert_eq!(
scored.len(),
3,
"all peers should be preserved when no alternatives exist"
);
}
#[test]
fn test_filter_low_score_removes_bad_when_alternatives_exist() {
let good_peer = make_peer(7002);
let good_key = good_peer.pub_key().clone();
let mut scored = vec![
(0.08, make_peer(7001)), (0.30, good_peer), ];
let fallback_count = 1; filter_low_score_peers(&mut scored, fallback_count);
assert_eq!(
scored.len(),
1,
"low-score peer should be filtered when alternatives exist"
);
assert_eq!(scored[0].1.pub_key(), &good_key);
}
#[test]
fn test_connect_failed_serialization_roundtrip() {
use bincode::{deserialize, serialize};
let tx = Transaction::new::<ConnectMsg>();
let addr: SocketAddr = "10.0.0.5:9000".parse().unwrap();
let msg = ConnectMsg::ConnectFailed {
id: tx,
failed_acceptor_addr: addr,
};
let encoded = serialize(&msg).expect("serialize ConnectFailed");
let decoded: ConnectMsg = deserialize(&encoded).expect("deserialize ConnectFailed");
match decoded {
ConnectMsg::ConnectFailed {
id,
failed_acceptor_addr,
} => {
assert_eq!(id, tx);
assert_eq!(failed_acceptor_addr, addr);
}
ConnectMsg::Request { .. }
| ConnectMsg::Response { .. }
| ConnectMsg::ObservedAddress { .. }
| ConnectMsg::Rejected { .. } => {
panic!("expected ConnectFailed, got: {decoded}")
}
}
}
#[test]
fn test_connect_failed_inner_message_traits() {
let tx = Transaction::new::<ConnectMsg>();
let addr: SocketAddr = "10.0.0.5:9000".parse().unwrap();
let msg = ConnectMsg::ConnectFailed {
id: tx,
failed_acceptor_addr: addr,
};
assert_eq!(
*msg.id(),
tx,
"ConnectFailed should return its transaction id"
);
assert_eq!(
msg.requested_location(),
None,
"ConnectFailed should return None for requested_location"
);
let display = format!("{msg}");
assert!(
display.contains("ConnectFailed"),
"Display should mention ConnectFailed: {display}"
);
}
#[test]
fn test_jitter_magnitude_at_various_failure_counts() {
let cases = vec![
(1u32, 0.0), (2, 0.05), (3, 0.10), (4, 0.20), (5, 0.25), (10, 0.25), ];
for (failures, expected_magnitude) in cases {
let magnitude = if failures > 1 {
(0.05 * (1u32 << (failures - 2).min(3)) as f64).min(0.25)
} else {
0.0
};
assert!(
(magnitude - expected_magnitude).abs() < 1e-10,
"failures={failures}: expected magnitude {expected_magnitude}, got {magnitude}"
);
}
}
#[test]
fn test_connect_jitter_counter() {
let cm = crate::ring::ConnectionManager::test_default();
let now = Instant::now();
assert_eq!(cm.connect_jitter_failure_count(), 0);
let count = cm.increment_connect_jitter_failures(now);
assert_eq!(count, 1);
assert_eq!(cm.connect_jitter_failure_count(), 1);
let count = cm.increment_connect_jitter_failures(now);
assert_eq!(count, 2);
cm.reset_connect_jitter_failures();
assert_eq!(cm.connect_jitter_failure_count(), 0);
}
#[test]
fn test_connect_jitter_decay() {
let cm = crate::ring::ConnectionManager::test_default();
let now = Instant::now();
for _ in 0..6 {
cm.increment_connect_jitter_failures(now);
}
assert_eq!(cm.connect_jitter_failure_count(), 6);
let later = now + Duration::from_secs(15 * 60);
let count = cm.increment_connect_jitter_failures(later);
assert_eq!(count, 4, "jitter should decay proportionally to idle time");
let much_later = later + Duration::from_secs(60 * 60);
let count = cm.increment_connect_jitter_failures(much_later);
assert_eq!(
count, 1,
"jitter should decay to zero after long idle period"
);
}
#[test]
fn test_acceptor_reliability_tracking() {
let cm = crate::ring::ConnectionManager::test_default();
let addr: SocketAddr = "10.0.0.5:9000".parse().unwrap();
let now = Instant::now();
let score = cm.peer_acceptor_reliability(addr, now);
assert!(
(score - 0.5).abs() < f64::EPSILON,
"unknown peer should have 0.5 reliability"
);
cm.record_acceptor_outcome(addr, false, now);
cm.record_acceptor_outcome(addr, false, now);
cm.record_acceptor_outcome(addr, false, now);
let score = cm.peer_acceptor_reliability(addr, now);
assert!(
(score - 0.2).abs() < 0.01,
"3 failures should give ~0.2 reliability, got {}",
score
);
cm.record_acceptor_outcome(addr, true, now);
let score = cm.peer_acceptor_reliability(addr, now);
assert!(
score > 0.2,
"success should increase reliability from 0.2, got {}",
score
);
}
#[test]
fn test_skip_list_skips_self_and_visited() {
let tx = Transaction::new::<ConnectMsg>();
let mut visited = VisitedPeers::new(&tx);
let self_addr: SocketAddr = "10.0.0.1:5000".parse().unwrap();
let visited_addr: SocketAddr = "10.0.0.2:5000".parse().unwrap();
let normal_addr: SocketAddr = "10.0.0.3:5000".parse().unwrap();
visited.mark_visited(visited_addr);
let skip = SkipListWithSelf {
visited: &visited,
self_addr: Some(self_addr),
};
assert!(skip.has_element(self_addr), "self should be skipped");
assert!(
skip.has_element(visited_addr),
"visited peer should be skipped"
);
assert!(
!skip.has_element(normal_addr),
"normal peer should not be skipped"
);
}
#[test]
fn test_bloom_filter_excludes_failed_acceptor() {
let tx = Transaction::new::<ConnectMsg>();
let mut visited = VisitedPeers::new(&tx);
let failed_addr: SocketAddr = "10.0.0.99:9000".parse().unwrap();
assert!(
!visited.probably_visited(failed_addr),
"addr should not be visited initially"
);
visited.mark_visited(failed_addr);
assert!(
visited.probably_visited(failed_addr),
"failed addr should be marked as visited after ConnectFailed processing"
);
}
#[test]
fn test_response_forwarded_prevents_false_timeout() {
let relay_peer = make_peer(5001);
let state = RelayState {
upstream_addr: "10.0.0.1:5000".parse().unwrap(),
request: ConnectRequest {
desired_location: Location::new(0.3),
joiner: make_peer(5002),
ttl: 5,
visited: VisitedPeers::new(&Transaction::new::<ConnectMsg>()),
uphill_budget: DEFAULT_UPHILL_BUDGET,
},
forwarded_to: Some(relay_peer),
forwarded_at: Some(Instant::now()),
observed_sent: false,
accepted_locally: false,
response_forwarded: true,
};
assert!(state.response_forwarded, "response_forwarded should be set");
assert!(
state.forwarded_to.is_some(),
"forwarded_to should still be preserved for ConnectFailed propagation"
);
}
#[test]
fn test_initiate_join_request_excludes_connected_peers() {
let own = make_peer(8001);
let target = make_peer(8002);
let desired_location = Location::try_from(0.5).unwrap();
let estimator = Arc::new(RwLock::new(ConnectForwardEstimator::new()));
let connected_peer1: SocketAddr = "10.0.0.10:9000".parse().unwrap();
let connected_peer2: SocketAddr = "10.0.0.11:9001".parse().unwrap();
let unconnected_peer: SocketAddr = "10.0.0.99:9999".parse().unwrap();
let exclude_addrs = vec![connected_peer1, connected_peer2];
let (tx, _op, msg) = ConnectOp::initiate_join_request(
own.clone(),
target.clone(),
desired_location,
10,
3,
estimator,
&exclude_addrs,
);
let request = match msg {
ConnectMsg::Request { payload, .. } => payload,
ConnectMsg::Response { .. }
| ConnectMsg::ObservedAddress { .. }
| ConnectMsg::Rejected { .. }
| ConnectMsg::ConnectFailed { .. } => panic!("Expected ConnectMsg::Request"),
};
let visited = request.visited.with_transaction(&tx);
assert!(
visited.probably_visited(connected_peer1),
"connected peer 1 should be marked visited to prevent routing back to it"
);
assert!(
visited.probably_visited(connected_peer2),
"connected peer 2 should be marked visited to prevent routing back to it"
);
assert!(
!visited.probably_visited(unconnected_peer),
"unconnected peer should not be in the visited filter"
);
}
#[test]
fn test_gap_target_produces_shorter_connections_than_jitter() {
use crate::topology::small_world_rand::gap_target;
let _guard = GlobalRng::seed_guard(42);
let my_location = Location::new(0.5);
let existing_connections = [
Location::new(0.501), Location::new(0.51), Location::new(0.55), Location::new(0.7), Location::new(0.9), ];
let distances: Vec<f64> = existing_connections
.iter()
.map(|loc| my_location.distance(*loc).as_f64())
.collect();
let mut gap_target_distances = Vec::new();
let mut jitter_distances = Vec::new();
for _ in 0..100 {
let target = gap_target(my_location, distances.iter().copied());
gap_target_distances.push(my_location.distance(target).as_f64());
let magnitude = 0.10;
let uniform_01 = (GlobalRng::random_u64() as f64) / (u64::MAX as f64);
let offset = magnitude * (2.0 * uniform_01 - 1.0);
let jittered = (my_location.as_f64() + offset).rem_euclid(1.0);
jitter_distances.push(my_location.distance(Location::new(jittered)).as_f64());
}
gap_target_distances.sort_by(|a, b| a.partial_cmp(b).unwrap());
jitter_distances.sort_by(|a, b| a.partial_cmp(b).unwrap());
let gap_median = gap_target_distances[50];
let jitter_median = jitter_distances[50];
assert!(
gap_median < jitter_median,
"gap_target median ({gap_median:.4}) should be shorter than jitter median ({jitter_median:.4})"
);
assert!(
gap_median < 0.05,
"gap_target median ({gap_median:.4}) should be < 0.05 (Kleinberg-optimal produces short connections)"
);
}
}