use crate::{message::TransactionType, ring::Location};
use connection_evaluator::ConnectionEvaluator;
use meter::Meter;
use outbound_request_counter::OutboundRequestCounter;
use request_density_tracker::{CachedDensityMap, RequestDensityTracker};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use tokio::time::Instant;
use tracing::{Level, debug, event, info, span, trace, warn};
pub mod connection_evaluator;
mod constants;
pub(crate) mod meter;
pub(crate) mod outbound_request_counter;
pub(crate) mod rate;
pub mod request_density_tracker;
pub(crate) mod running_average;
pub(crate) mod small_world_rand;
use crate::ring::{Connection, PeerKeyLocation};
use crate::topology::meter::{AttributionSource, ResourceType};
use crate::topology::rate::{Rate, RateProportion};
use constants::*;
use request_density_tracker::DensityMapError;
pub(crate) struct TopologyManager {
limits: Limits,
meter: Meter,
source_creation_times: BTreeMap<AttributionSource, Instant>,
slow_connection_evaluator: ConnectionEvaluator,
fast_connection_evaluator: ConnectionEvaluator,
request_density_tracker: RequestDensityTracker,
pub(crate) outbound_request_counter: OutboundRequestCounter,
cached_density_map: CachedDensityMap,
connection_acquisition_strategy: ConnectionAcquisitionStrategy,
}
impl TopologyManager {
pub(crate) fn new(limits: Limits) -> Self {
TopologyManager {
meter: Meter::new_with_window_size(100),
limits,
source_creation_times: BTreeMap::new(),
slow_connection_evaluator: ConnectionEvaluator::new(
SLOW_CONNECTION_EVALUATOR_WINDOW_DURATION,
),
fast_connection_evaluator: ConnectionEvaluator::new(
FAST_CONNECTION_EVALUATOR_WINDOW_DURATION,
),
request_density_tracker: RequestDensityTracker::new(
REQUEST_DENSITY_TRACKER_WINDOW_SIZE,
),
cached_density_map: CachedDensityMap::new(),
outbound_request_counter: OutboundRequestCounter::new(
OUTBOUND_REQUEST_COUNTER_WINDOW_SIZE,
),
connection_acquisition_strategy: ConnectionAcquisitionStrategy::Fast,
}
}
pub(crate) fn refresh_cache(
&mut self,
neighbor_locations: &BTreeMap<Location, Vec<Connection>>,
) -> Result<(), DensityMapError> {
self.cached_density_map
.set(&self.request_density_tracker, neighbor_locations)?;
Ok(())
}
pub(crate) fn record_request(
&mut self,
recipient: PeerKeyLocation,
target: Location,
request_type: TransactionType,
) {
debug!(
request_type = %request_type,
recipient = %recipient,
target_location = %target,
"Recording request sent to peer"
);
self.request_density_tracker.sample(target);
self.outbound_request_counter.record_request(recipient);
}
pub(crate) fn evaluate_new_connection_with_score(
&mut self,
score: f64,
current_time: Instant,
) -> bool {
match self.connection_acquisition_strategy {
ConnectionAcquisitionStrategy::Slow => {
self.fast_connection_evaluator
.record_only_with_current_time(score, current_time);
self.slow_connection_evaluator
.record_and_eval_with_current_time(score, current_time)
}
ConnectionAcquisitionStrategy::Fast => {
self.slow_connection_evaluator
.record_only_with_current_time(score, current_time);
self.fast_connection_evaluator
.record_and_eval_with_current_time(score, current_time)
}
}
}
#[cfg(test)]
fn get_best_candidate_location(
&self,
this_peer_location: &Location,
) -> Result<Location, DensityMapError> {
let density_map = self
.cached_density_map
.get()
.ok_or(DensityMapError::EmptyNeighbors)?;
let best_location = match density_map.get_max_density() {
Ok(location) => {
debug!(location = %location, "Max density found");
location
}
Err(_) => {
debug!(
fallback_location = %this_peer_location,
"An error occurred while getting max density, falling back to random location"
);
*this_peer_location
}
};
Ok(best_location)
}
#[cfg(test)]
pub(self) fn update_limits(&mut self, limits: Limits) {
self.limits = limits;
}
#[allow(dead_code)] pub(crate) fn report_split_resource_usage(
&mut self,
attributions: &[AttributionSource],
resource: ResourceType,
value: f64,
at_time: Instant,
) {
let split_value = value / attributions.len() as f64;
for attribution in attributions {
self.report_resource_usage(attribution, resource, split_value, at_time);
}
}
#[allow(dead_code)] pub(crate) fn report_resource_usage(
&mut self,
attribution: &AttributionSource,
resource: ResourceType,
amount: f64,
at_time: Instant,
) {
if let Some(creation_time) = self.source_creation_times.get(attribution) {
if at_time < *creation_time {
self.source_creation_times
.insert(attribution.clone(), at_time);
}
} else {
self.source_creation_times
.insert(attribution.clone(), at_time);
}
self.meter.report(attribution, resource, amount, at_time);
}
pub(crate) fn report_outbound_request(&mut self, peer: PeerKeyLocation, target: Location) {
self.request_density_tracker.sample(target);
self.outbound_request_counter.record_request(peer);
}
fn extrapolated_usage(&mut self, resource_type: &ResourceType, now: Instant) -> Usage {
let function_span = span!(Level::DEBUG, "extrapolated_usage_function");
let _enter = function_span.enter();
let mut total_usage: Rate = Rate::new_per_second(0.0);
let mut usage_per_source: BTreeMap<AttributionSource, Rate> = BTreeMap::new();
let collect_data_span = span!(Level::DEBUG, "collect_data");
let _collect_data_guard = collect_data_span.enter();
debug!("Collecting data from source_creation_times");
let mut usage_data = Vec::new();
for (source, creation_time) in self.source_creation_times.iter() {
let ramping_up = now.duration_since(*creation_time) <= SOURCE_RAMP_UP_DURATION;
debug!(
"Source: {:?}, Creation time: {:?}, Ramping up: {}",
source, creation_time, ramping_up
);
usage_data.push((source.clone(), ramping_up));
}
drop(_collect_data_guard);
let process_data_span = span!(Level::DEBUG, "process_data");
let _process_data_guard = process_data_span.enter();
debug!("Processing data for usage calculation");
for (source, ramping_up) in usage_data {
let usage_rate: Option<Rate> = if ramping_up {
debug!("Source {:?} is ramping up", source);
self.meter.get_adjusted_usage_rate(resource_type, now)
} else {
debug!("Source {:?} is not ramping up", source);
self.attributed_usage_rate(&source, resource_type, now)
};
debug!("Usage rate for source {:?}: {:?}", source, usage_rate);
total_usage += usage_rate.unwrap_or(Rate::new_per_second(0.0));
usage_per_source.insert(source, usage_rate.unwrap_or(Rate::new_per_second(0.0)));
}
drop(_process_data_guard);
debug!("Total usage: {:?}", total_usage);
debug!("Usage per source: {:?}", usage_per_source);
Usage {
total: total_usage,
per_source: usage_per_source,
}
}
pub(crate) fn attributed_usage_rate(
&mut self,
source: &AttributionSource,
resource_type: &ResourceType,
now: Instant,
) -> Option<Rate> {
self.meter.attributed_usage_rate(source, resource_type, now)
}
pub(crate) fn adjust_topology(
&mut self,
neighbor_locations: &BTreeMap<Location, Vec<Connection>>,
my_location: &Option<Location>,
at_time: Instant,
current_connections: usize,
) -> TopologyAdjustment {
if current_connections < self.limits.min_connections {
let needed = self.limits.min_connections - current_connections;
if current_connections >= DENSITY_SELECTION_THRESHOLD {
let locations = Self::sample_targets(my_location, neighbor_locations, needed);
return TopologyAdjustment::AddConnections(locations);
}
let locations = bootstrap_target_locations(my_location, current_connections, needed);
#[cfg(debug_assertions)]
if current_connections == 0 {
thread_local! {
static LAST_LOG: std::cell::RefCell<Instant> = std::cell::RefCell::new(Instant::now());
}
if LAST_LOG.with(|last_log| {
last_log.borrow().elapsed() > std::time::Duration::from_secs(10)
}) {
LAST_LOG.with(|last_log| {
tracing::trace!(
minimum_num_peers_hard_limit = self.limits.min_connections,
num_peers = current_connections,
to_add = needed,
"Bootstrap: adding first connection at own location"
);
*last_log.borrow_mut() = Instant::now();
});
}
}
return TopologyAdjustment::AddConnections(locations);
}
let increase_usage_if_below: RateProportion =
RateProportion::new(MINIMUM_DESIRED_RESOURCE_USAGE_PROPORTION);
let decrease_usage_if_above: RateProportion =
RateProportion::new(MAXIMUM_DESIRED_RESOURCE_USAGE_PROPORTION);
let (resource_type, usage_proportion) = self.calculate_usage_proportion(at_time);
let mut suppress_swap = false;
let adjustment: anyhow::Result<TopologyAdjustment> = if current_connections
> self.limits.max_connections
{
debug!(
current_connections,
max_connections = self.limits.max_connections,
"Above max connections, removing"
);
self.update_connection_acquisition_strategy(ConnectionAcquisitionStrategy::Slow);
Ok(self.select_connections_to_remove(
&resource_type,
at_time,
my_location,
neighbor_locations,
))
} else if usage_proportion < increase_usage_if_below {
let low_usage_cap = ((self.limits.min_connections as f64
* LOW_USAGE_CONNECTION_GROWTH_FACTOR) as usize)
.min(self.limits.max_connections);
if current_connections >= low_usage_cap {
debug!(
current_connections,
low_usage_cap,
"Resource usage low but at low-usage connection cap — not adding"
);
Ok(TopologyAdjustment::NoChange)
} else {
debug!(
resource_type = ?resource_type,
usage_proportion = ?usage_proportion,
current_connections,
low_usage_cap,
"Resource usage below threshold, adding connection"
);
self.update_connection_acquisition_strategy(ConnectionAcquisitionStrategy::Fast);
let locations = Self::sample_targets(my_location, neighbor_locations, 1);
Ok(TopologyAdjustment::AddConnections(locations))
}
} else if usage_proportion > decrease_usage_if_above {
if current_connections <= self.limits.min_connections {
debug!(
current_connections,
min_connections = self.limits.min_connections,
"Resource usage high but at min_connections — not removing"
);
suppress_swap = true;
Ok(TopologyAdjustment::NoChange)
} else {
debug!(
resource_type = ?resource_type,
usage_proportion = ?usage_proportion,
"Resource usage above threshold, removing connection"
);
Ok(self.select_connections_to_remove(
&resource_type,
at_time,
my_location,
neighbor_locations,
))
}
} else {
Ok(TopologyAdjustment::NoChange)
};
if current_connections > self.limits.max_connections {
let mut adj = adjustment.unwrap_or(TopologyAdjustment::NoChange);
if matches!(adj, TopologyAdjustment::NoChange) {
if let Some(peer) = select_fallback_peer_to_drop(neighbor_locations, my_location) {
info!(
current_connections,
max_connections = self.limits.max_connections,
peer = %peer,
"Enforcing max-connections cap via fallback removal"
);
adj = TopologyAdjustment::RemoveConnections(vec![peer]);
} else {
warn!(
current_connections,
max_connections = self.limits.max_connections,
"Over capacity but no removable peer found"
);
}
}
return adj;
}
let adj = adjustment.unwrap_or(TopologyAdjustment::NoChange);
if !suppress_swap && !matches!(adj, TopologyAdjustment::RemoveConnections(_)) {
let swap =
self.maybe_swap_connection(my_location, neighbor_locations, current_connections);
if !matches!(swap, TopologyAdjustment::NoChange) {
return swap;
}
}
adj
}
fn sample_targets(
my_location: &Option<Location>,
neighbor_locations: &BTreeMap<Location, Vec<Connection>>,
count: usize,
) -> Vec<Location> {
match my_location {
Some(loc) => {
let mut signed_distances: Vec<f64> = neighbor_locations
.keys()
.map(|nloc| loc.signed_distance(*nloc))
.collect();
let mut targets = Vec::with_capacity(count);
for i in 0..count {
let target = if i % 2 == 0 {
let t = small_world_rand::gap_target_directional(*loc, &signed_distances);
signed_distances.push(loc.signed_distance(t));
t
} else {
small_world_rand::kleinberg_target(*loc)
};
targets.push(target);
}
targets
}
None => (0..count).map(|_| Location::random()).collect(),
}
}
fn calculate_usage_proportion(&mut self, at_time: Instant) -> (ResourceType, RateProportion) {
let mut usage_rate_per_type = BTreeMap::new();
for resource_type in ResourceType::all() {
let usage = self.extrapolated_usage(&resource_type, at_time);
let proportion = usage.total.proportion_of(&self.limits.get(&resource_type));
usage_rate_per_type.insert(resource_type, proportion);
}
let max_usage_rate = usage_rate_per_type
.iter()
.max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(Ordering::Equal))
.unwrap();
(*max_usage_rate.0, *max_usage_rate.1)
}
fn update_connection_acquisition_strategy(
&mut self,
new_strategy: ConnectionAcquisitionStrategy,
) {
self.connection_acquisition_strategy = new_strategy;
}
fn select_connections_to_remove(
&mut self,
exceeded_usage_for_resource_type: &ResourceType,
at_time: Instant,
my_location: &Option<Location>,
neighbor_locations: &BTreeMap<Location, Vec<Connection>>,
) -> TopologyAdjustment {
let function_span = span!(Level::INFO, "remove_connections");
let _enter = function_span.enter();
let all_signed_distances: Vec<f64> = match my_location {
Some(my_loc) => neighbor_locations
.keys()
.map(|nloc| my_loc.signed_distance(*nloc))
.collect(),
None => Vec::new(),
};
let peer_to_loc_info: std::collections::HashMap<PeerKeyLocation, (f64, usize)> =
match my_location {
Some(my_loc) => neighbor_locations
.iter()
.flat_map(|(loc, conns)| {
let signed_dist = my_loc.signed_distance(*loc);
let count = conns.len();
conns
.iter()
.map(move |c| (c.location.clone(), (signed_dist, count)))
})
.collect(),
None => std::collections::HashMap::new(),
};
struct Candidate {
peer: PeerKeyLocation,
routing_value: f64,
peer_signed_distance: Option<f64>,
peers_at_location: usize,
}
let mut candidates = Vec::new();
for (source, source_usage) in self
.meter
.get_usage_rates(exceeded_usage_for_resource_type, at_time)
{
if let Some(creation_time) = self.source_creation_times.get(&source) {
if at_time.duration_since(*creation_time) <= SOURCE_RAMP_UP_DURATION {
continue;
}
} else {
continue;
}
if let AttributionSource::Peer(peer) = source {
let request_count = self.outbound_request_counter.get_request_count(&peer) as f64;
let per_sec = source_usage.per_second();
let routing_value = if per_sec > 0.0 {
request_count / per_sec
} else {
0.0
};
let (peer_signed_distance, peers_at_location) = match peer_to_loc_info.get(&peer) {
Some(&(sd, count)) => (Some(sd), count),
None => (None, 1),
};
candidates.push(Candidate {
peer,
routing_value,
peer_signed_distance,
peers_at_location,
});
}
}
let max_routing = candidates
.iter()
.map(|c| c.routing_value)
.fold(0.0_f64, f64::max);
let mut worst: Option<(PeerKeyLocation, f64)> = None;
for c in &candidates {
let normalized_routing = if max_routing > 0.0 {
c.routing_value / max_routing
} else {
0.0
};
let effective_value = match c.peer_signed_distance {
Some(signed_dist) => {
match composite_score(
normalized_routing,
signed_dist,
&all_signed_distances,
c.peers_at_location,
) {
Some(score) => score,
None => {
continue;
}
}
}
None => normalized_routing + TOPOLOGY_WEIGHT,
};
event!(
Level::DEBUG,
routing_value = c.routing_value,
normalized_routing,
effective_value,
peer = ?c.peer,
);
if let Some((_, worst_effective_value)) = worst {
if effective_value < worst_effective_value {
worst = Some((c.peer.clone(), effective_value));
}
} else {
worst = Some((c.peer.clone(), effective_value));
}
}
if let Some((peer, _)) = worst {
event!(Level::INFO, action = "Recommend peer for removal", peer = ?peer);
TopologyAdjustment::RemoveConnections(vec![peer])
} else if !candidates.is_empty() {
event!(Level::WARN, "All peers topology-critical, using fallback");
match select_fallback_peer_to_drop(neighbor_locations, my_location) {
Some(peer) => TopologyAdjustment::RemoveConnections(vec![peer]),
None => TopologyAdjustment::NoChange,
}
} else {
event!(Level::WARN, "Couldn't find a suitable peer to remove");
TopologyAdjustment::NoChange
}
}
fn maybe_swap_connection(
&self,
my_location: &Option<Location>,
neighbor_locations: &BTreeMap<Location, Vec<Connection>>,
current_connections: usize,
) -> TopologyAdjustment {
if current_connections < self.limits.min_connections {
return TopologyAdjustment::NoChange;
}
let Some(&my_loc) = my_location.as_ref() else {
return TopologyAdjustment::NoChange;
};
let signed_distances: Vec<f64> = neighbor_locations
.keys()
.map(|nloc| my_loc.signed_distance(*nloc))
.collect();
let (largest_gap, side_count) =
small_world_rand::largest_gap_size_directional(&signed_distances);
let expected_gap = if side_count == 0 {
EXPECTED_GAP_FLOOR
} else {
let k = side_count as f64;
k.ln().max(EXPECTED_GAP_FLOOR) / k
};
let excess = ((largest_gap / expected_gap) - 1.0).clamp(0.0, 1.0);
let swap_prob = excess * MAX_SWAP_PROB_PER_TICK;
if swap_prob <= 0.0 {
return TopologyAdjustment::NoChange;
}
let roll: f64 = crate::config::GlobalRng::random_range(0.0..1.0);
if roll >= swap_prob {
trace!(
largest_gap,
expected_gap, excess, swap_prob, roll, "Topology swap check: not triggered"
);
return TopologyAdjustment::NoChange;
}
let peers_with_routing: Vec<_> = neighbor_locations
.iter()
.flat_map(|(loc, conns)| {
let count = conns.len();
conns.iter().map(move |conn| (loc, conn, count))
})
.map(|(loc, conn, count)| {
let peer_signed_dist = my_loc.signed_distance(*loc);
let routing_value = self
.outbound_request_counter
.get_request_count(&conn.location) as f64;
(
conn.location.clone(),
peer_signed_dist,
routing_value,
count,
)
})
.collect();
let max_routing = peers_with_routing
.iter()
.map(|(_, _, rv, _)| *rv)
.fold(0.0_f64, f64::max);
let remove = peers_with_routing
.into_iter()
.filter_map(|(peer, peer_signed_dist, routing_value, peers_at_loc)| {
let normalized = if max_routing > 0.0 {
routing_value / max_routing
} else {
0.0
};
let score = composite_score(
normalized,
peer_signed_dist,
&signed_distances,
peers_at_loc,
)?;
Some((peer, score))
})
.min_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(Ordering::Equal))
.map(|(peer, _)| peer);
let Some(remove) = remove else {
return TopologyAdjustment::NoChange;
};
let add_location = small_world_rand::gap_target_directional(my_loc, &signed_distances);
info!(
largest_gap,
expected_gap,
excess,
swap_prob,
remove_peer = %remove,
add_target = %add_location,
"Topology swap triggered: replacing least-routed peer with gap-targeted connection"
);
TopologyAdjustment::SwapConnection {
remove,
add_location,
}
}
}
fn bootstrap_target_locations(
my_location: &Option<Location>,
current_connections: usize,
needed: usize,
) -> Vec<Location> {
debug_assert!(current_connections < DENSITY_SELECTION_THRESHOLD);
if current_connections == 0 {
return match my_location {
Some(location) => vec![*location],
None => vec![Location::random()],
};
}
match my_location {
Some(location) => {
let mut locations = Vec::with_capacity(needed);
locations.push(*location);
for i in 1..needed {
let offset = i as f64 / needed as f64;
locations.push(Location::new_rounded(location.as_f64() + offset));
}
locations
}
None => (0..needed).map(|_| Location::random()).collect(),
}
}
fn topology_value(
peer_signed_distance: f64,
all_signed_distances: &[f64],
peers_at_same_location: usize,
) -> Option<f64> {
let k = all_signed_distances.len();
if k < 3 {
return Some(1.0);
}
if peers_at_same_location > 1 {
return Some(0.0);
}
let (gap, same_side_count) =
small_world_rand::removal_gap_directional(peer_signed_distance, all_signed_distances);
let k_side = (same_side_count as f64).max(1.0);
let expected_gap = 2.0 / (k_side + 1.0);
let value = gap / expected_gap;
if value > TOPOLOGY_PROTECTION_THRESHOLD {
None } else {
Some(value)
}
}
fn composite_score(
normalized_routing: f64,
peer_signed_distance: f64,
all_signed_distances: &[f64],
peers_at_same_location: usize,
) -> Option<f64> {
let topo = topology_value(
peer_signed_distance,
all_signed_distances,
peers_at_same_location,
)?;
Some(normalized_routing + TOPOLOGY_WEIGHT * topo)
}
fn select_fallback_peer_to_drop(
neighbor_locations: &BTreeMap<Location, Vec<Connection>>,
my_location: &Option<Location>,
) -> Option<PeerKeyLocation> {
let Some(my_loc) = my_location else {
return neighbor_locations
.values()
.flatten()
.next()
.map(|conn| conn.location.clone());
};
let all_signed_distances: Vec<f64> = neighbor_locations
.keys()
.map(|nloc| my_loc.signed_distance(*nloc))
.collect();
neighbor_locations
.iter()
.flat_map(|(loc, conns)| conns.iter().map(move |conn| (loc, conn)))
.min_by(|(loc_a, _), (loc_b, _)| {
let (gap_a, _) = small_world_rand::removal_gap_directional(
my_loc.signed_distance(**loc_a),
&all_signed_distances,
);
let (gap_b, _) = small_world_rand::removal_gap_directional(
my_loc.signed_distance(**loc_b),
&all_signed_distances,
);
gap_a.partial_cmp(&gap_b).unwrap_or(Ordering::Equal)
})
.map(|(_, conn)| conn.location.clone())
}
#[derive(PartialEq, Debug, Clone, Copy)]
pub(crate) enum ConnectionAcquisitionStrategy {
Slow,
Fast,
}
#[cfg(test)]
mod tests {
#[test]
fn test_topology() {
const NUM_REQUESTS: usize = 5_000;
let mut topology_manager = TopologyManager::new(Limits {
max_upstream_bandwidth: Rate::new_per_second(1000.0),
max_downstream_bandwidth: Rate::new_per_second(1000.0),
min_connections: 5,
max_connections: 200,
});
let mut current_neighbors = std::collections::BTreeMap::new();
for i in 0..10 {
current_neighbors.insert(Location::new(i as f64 / 10.0), vec![]);
}
let this_peer_location = Location::new(0.39);
for _ in 0..NUM_REQUESTS {
let requested_location = random_location(&random_location(&this_peer_location));
topology_manager.record_request(
PeerKeyLocation::random(),
requested_location,
TransactionType::Get,
);
}
topology_manager
.cached_density_map
.set(
&topology_manager.request_density_tracker,
¤t_neighbors,
)
.unwrap();
let best_candidate_location = topology_manager
.get_best_candidate_location(&this_peer_location)
.unwrap();
assert_eq!(best_candidate_location, Location::new(0.35));
let mut best_score = 0.0;
let mut best_location = Location::new(0.0);
for i in 0..100 {
let candidate_location = Location::new(i as f64 / 100.0);
let score = topology_manager
.cached_density_map
.get()
.unwrap()
.get_density_at(candidate_location)
.unwrap();
if score > best_score {
best_score = score;
best_location = candidate_location;
}
}
assert_eq!(best_location, Location::new(0.4));
}
fn random_location(this_peer_location: &Location) -> Location {
use crate::config::GlobalRng;
tracing::debug!("Generating random location");
let distance = small_world_rand::test_utils::random_link_distance(Distance::new(0.001));
let location_f64 = if GlobalRng::random_bool(0.5) {
this_peer_location.as_f64() - distance.as_f64()
} else {
this_peer_location.as_f64() + distance.as_f64()
};
let location_f64 = location_f64.rem_euclid(1.0);
Location::new(location_f64)
}
use super::*;
use crate::ring::Distance;
use std::time::Duration;
#[test_log::test]
fn test_resource_manager_report() {
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(1000.0),
max_downstream_bandwidth: Rate::new_per_second(1000.0),
max_connections: 200,
min_connections: 5,
};
let mut resource_manager = TopologyManager::new(limits);
let attribution = AttributionSource::Peer(PeerKeyLocation::random());
let now = Instant::now();
resource_manager.report_resource_usage(
&attribution,
ResourceType::InboundBandwidthBytes,
100.0,
now,
);
assert_eq!(
resource_manager
.meter
.attributed_usage_rate(&attribution, &ResourceType::InboundBandwidthBytes, now)
.unwrap()
.per_second(),
100.0
);
}
#[test_log::test]
fn test_remove_connections() {
let mut resource_manager = setup_topology_manager(1000.0);
let peers = generate_random_peers(6);
let bw_usage_by_peer = vec![1000, 1100, 1200, 2000, 1600, 1300];
let report_time = Instant::now() - SOURCE_RAMP_UP_DURATION - Duration::from_secs(30);
report_resource_usage(
&mut resource_manager,
&peers,
&bw_usage_by_peer,
report_time,
);
let requests_per_peer = vec![20, 19, 18, 9, 9, 15];
report_outbound_requests(&mut resource_manager, &peers, &requests_per_peer);
let worst_ix = find_worst_peer(&peers, &bw_usage_by_peer, &requests_per_peer);
assert_eq!(worst_ix, 3);
let worst_peer = &peers[worst_ix];
let mut neighbor_locations = BTreeMap::new();
for peer in &peers {
neighbor_locations.insert(peer.location().unwrap(), vec![]);
}
let adjustment = resource_manager.adjust_topology(
&neighbor_locations,
&None,
Instant::now(),
peers.len(),
);
match adjustment {
TopologyAdjustment::RemoveConnections(peers) => {
assert_eq!(peers.len(), 1);
assert_eq!(peers[0], *worst_peer);
}
TopologyAdjustment::AddConnections(_)
| TopologyAdjustment::NoChange
| TopologyAdjustment::SwapConnection { .. } => {
panic!("Expected to remove a peer, adjustment was {adjustment:?}")
}
}
}
#[test_log::test]
fn test_add_connections() {
let mut resource_manager = setup_topology_manager(1000.0);
let peers: Vec<PeerKeyLocation> = generate_random_peers(5);
let bw_usage_by_peer = vec![10, 20, 30, 25, 30];
let report_time = Instant::now() - SOURCE_RAMP_UP_DURATION - Duration::from_secs(30);
report_resource_usage(
&mut resource_manager,
&peers,
&bw_usage_by_peer,
report_time,
);
let requests_per_peer = vec![20, 19, 18, 9, 9];
report_outbound_requests(&mut resource_manager, &peers, &requests_per_peer);
let mut neighbor_locations = BTreeMap::new();
for peer in &peers {
neighbor_locations.insert(peer.location().unwrap(), vec![]);
}
let adjustment = resource_manager.adjust_topology(
&neighbor_locations,
&None,
Instant::now(),
peers.len(),
);
match adjustment {
TopologyAdjustment::AddConnections(locations) => {
assert_eq!(locations.len(), 1);
}
TopologyAdjustment::RemoveConnections(_)
| TopologyAdjustment::NoChange
| TopologyAdjustment::SwapConnection { .. } => {
panic!("Expected to add a connection, adjustment was {adjustment:?}")
}
}
}
#[test_log::test]
fn test_no_adjustment() {
let mut resource_manager = setup_topology_manager(1000.0);
let peers = generate_random_peers(5);
let bw_usage_by_peer = vec![150, 200, 100, 100, 200];
let report_time = Instant::now() - SOURCE_RAMP_UP_DURATION - Duration::from_secs(30);
report_resource_usage(
&mut resource_manager,
&peers,
&bw_usage_by_peer,
report_time,
);
let requests_per_peer = vec![20, 19, 18, 9, 9];
report_outbound_requests(&mut resource_manager, &peers, &requests_per_peer);
let mut neighbor_locations = BTreeMap::new();
for peer in &peers {
neighbor_locations.insert(peer.location().unwrap(), vec![]);
}
let adjustment =
resource_manager.adjust_topology(&neighbor_locations, &None, report_time, peers.len());
match adjustment {
TopologyAdjustment::NoChange => {}
TopologyAdjustment::AddConnections(_)
| TopologyAdjustment::RemoveConnections(_)
| TopologyAdjustment::SwapConnection { .. } => {
panic!("Expected no adjustment, adjustment was {adjustment:?}")
}
}
}
#[test_log::test]
fn test_no_removal_at_min_connections() {
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(100000.0),
max_downstream_bandwidth: Rate::new_per_second(1000.0),
max_connections: 200,
min_connections: 5,
};
let mut resource_manager = TopologyManager::new(limits);
let peers = generate_random_peers(5);
let bw_usage_by_peer = vec![2000, 2000, 2000, 2000, 2000];
let report_time = Instant::now() - SOURCE_RAMP_UP_DURATION - Duration::from_secs(30);
report_resource_usage(
&mut resource_manager,
&peers,
&bw_usage_by_peer,
report_time,
);
let requests_per_peer = vec![5, 5, 5, 5, 5];
report_outbound_requests(&mut resource_manager, &peers, &requests_per_peer);
let mut neighbor_locations = BTreeMap::new();
for peer in &peers {
neighbor_locations.insert(peer.location().unwrap(), vec![]);
}
let adjustment = resource_manager.adjust_topology(
&neighbor_locations,
&None,
Instant::now(),
5, );
assert!(
!matches!(adjustment, TopologyAdjustment::RemoveConnections(_)),
"Should not remove connections when at min_connections, got {adjustment:?}"
);
let adjustment = resource_manager.adjust_topology(
&neighbor_locations,
&None,
Instant::now(),
3, );
match adjustment {
TopologyAdjustment::AddConnections(_) => {}
TopologyAdjustment::RemoveConnections(_)
| TopologyAdjustment::NoChange
| TopologyAdjustment::SwapConnection { .. } => {
panic!("Expected AddConnections when below min, got {adjustment:?}")
}
}
let adjustment = resource_manager.adjust_topology(
&neighbor_locations,
&None,
Instant::now(),
6, );
assert!(
matches!(adjustment, TopologyAdjustment::RemoveConnections(_)),
"Should allow removal when above min_connections, got {adjustment:?}"
);
}
#[test_log::test]
fn test_no_peers() {
let mut resource_manager = setup_topology_manager(1000.0);
let neighbor_locations = BTreeMap::new();
let my_location = Location::new(0.5);
let adjustment = resource_manager.adjust_topology(
&neighbor_locations,
&Some(my_location),
Instant::now(),
0,
);
match adjustment {
TopologyAdjustment::AddConnections(v) => {
assert_eq!(v.len(), 1);
assert_eq!(
v[0], my_location,
"First bootstrap target should be own location"
);
}
TopologyAdjustment::RemoveConnections(_)
| TopologyAdjustment::NoChange
| TopologyAdjustment::SwapConnection { .. } => {
panic!("Expected AddConnections, but was: {adjustment:?}")
}
}
}
#[test_log::test]
fn test_resource_based_add_uses_gap_targets() {
let _guard = crate::config::GlobalRng::seed_guard(0xBEEF_CAFE);
let mut resource_manager = setup_topology_manager(1000.0);
let peers: Vec<PeerKeyLocation> = generate_random_peers(6);
let bw_usage_by_peer = vec![5, 5, 5, 5, 5, 5];
let report_time = Instant::now() - SOURCE_RAMP_UP_DURATION - Duration::from_secs(30);
report_resource_usage(
&mut resource_manager,
&peers,
&bw_usage_by_peer,
report_time,
);
let mut neighbor_locations = BTreeMap::new();
for peer in &peers {
neighbor_locations.insert(peer.location().unwrap(), vec![]);
}
let my_location = peers[0].location().unwrap();
let mut close_count = 0;
let trials = 20;
for _ in 0..trials {
let adjustment = resource_manager.adjust_topology(
&neighbor_locations,
&Some(my_location),
Instant::now(),
peers.len(),
);
match adjustment {
TopologyAdjustment::AddConnections(locations) => {
assert_eq!(locations.len(), 1);
let dist_to_me = my_location.distance(locations[0]).as_f64();
if dist_to_me < 0.3 {
close_count += 1;
}
}
TopologyAdjustment::RemoveConnections(_)
| TopologyAdjustment::NoChange
| TopologyAdjustment::SwapConnection { .. } => {
panic!("Expected AddConnections, got {adjustment:?}")
}
}
}
assert!(
close_count > trials / 2,
"Expected most targets near my_location, got {close_count}/{trials} close"
);
}
fn setup_topology_manager(max_downstream_rate: f64) -> TopologyManager {
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(100000.0),
max_downstream_bandwidth: Rate::new_per_second(max_downstream_rate),
max_connections: 200,
min_connections: 5,
};
TopologyManager::new(limits)
}
fn generate_random_peers(num_peers: usize) -> Vec<PeerKeyLocation> {
let mut peers: Vec<PeerKeyLocation> =
(0..num_peers).map(|_| PeerKeyLocation::random()).collect();
peers.sort_by(|a, b| {
a.location()
.unwrap()
.partial_cmp(&b.location().unwrap())
.unwrap()
});
peers
}
fn report_resource_usage(
resource_manager: &mut TopologyManager,
peers: &[PeerKeyLocation],
bw_usage_by_peer: &[usize],
up_to_time: Instant,
) {
for (i, peer) in peers.iter().enumerate() {
for seconds in 1..600 {
let report_time = up_to_time - Duration::from_secs(600 - seconds);
tracing::trace!(
"Reporting {} bytes of inbound bandwidth for peer {:?} at {:?}",
bw_usage_by_peer[i],
peer,
report_time
);
resource_manager.report_resource_usage(
&AttributionSource::Peer(peer.clone()),
ResourceType::InboundBandwidthBytes,
bw_usage_by_peer[i] as f64,
report_time,
);
}
}
}
fn report_outbound_requests(
resource_manager: &mut TopologyManager,
peers: &[PeerKeyLocation],
requests_per_peer: &[usize],
) {
for (i, requests) in requests_per_peer.iter().enumerate() {
for _ in 0..*requests {
resource_manager
.report_outbound_request(peers[i].clone(), peers[i].location().unwrap());
}
}
}
fn find_worst_peer(
peers: &[PeerKeyLocation],
bw_usage_by_peer: &[usize],
requests_per_peer: &[usize],
) -> usize {
let mut values = vec![];
for ix in 0..peers.len() {
let peer = peers[ix].clone();
let value = requests_per_peer[ix] as f64 / bw_usage_by_peer[ix] as f64;
values.push(value);
debug!(
"Peer {:?} has value {}/{} = {}",
peer, requests_per_peer[ix], bw_usage_by_peer[ix], value
);
}
let mut worst_ix = 0;
for (ix, value) in values.iter().enumerate() {
if *value < values[worst_ix] {
worst_ix = ix;
}
}
worst_ix
}
#[test]
fn test_update_limits() {
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(1000.0),
max_downstream_bandwidth: Rate::new_per_second(1000.0),
max_connections: 200,
min_connections: 5,
};
let mut topology_manager = TopologyManager::new(limits);
let new_limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(2000.0),
max_downstream_bandwidth: Rate::new_per_second(2000.0),
max_connections: 200,
min_connections: 5,
};
topology_manager.update_limits(new_limits);
assert_eq!(
topology_manager.limits.max_upstream_bandwidth,
Rate::new_per_second(2000.0)
);
assert_eq!(
topology_manager.limits.max_downstream_bandwidth,
Rate::new_per_second(2000.0)
);
}
#[test_log::test]
fn test_below_threshold_uses_bootstrap_targets() {
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(1000.0),
max_downstream_bandwidth: Rate::new_per_second(1000.0),
max_connections: 200,
min_connections: 25,
};
let mut topology_manager = TopologyManager::new(limits);
let mut neighbor_locations = BTreeMap::new();
let peer = PeerKeyLocation::random();
neighbor_locations.insert(peer.location().unwrap(), vec![]);
let my_location = Location::new(0.5);
let adjustment = topology_manager.adjust_topology(
&neighbor_locations,
&Some(my_location),
Instant::now(),
1,
);
match adjustment {
TopologyAdjustment::AddConnections(locations) => {
assert_eq!(locations.len(), 24);
assert_eq!(locations[0], my_location);
let as_set: std::collections::BTreeSet<Location> =
locations.iter().copied().collect();
assert_eq!(
as_set.len(),
locations.len(),
"All bootstrap targets must be distinct"
);
}
TopologyAdjustment::RemoveConnections(_)
| TopologyAdjustment::NoChange
| TopologyAdjustment::SwapConnection { .. } => {
panic!("Expected AddConnections, got {adjustment:?}")
}
}
}
#[test_log::test]
fn test_above_threshold_uses_kleinberg_targets() {
let _guard = crate::config::GlobalRng::seed_guard(0xBEEF_CAFE);
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(1000.0),
max_downstream_bandwidth: Rate::new_per_second(1000.0),
max_connections: 200,
min_connections: 25,
};
let mut topology_manager = TopologyManager::new(limits);
let mut neighbor_locations = BTreeMap::new();
for _ in 0..5 {
let peer = PeerKeyLocation::random();
neighbor_locations.insert(peer.location().unwrap(), vec![]);
}
let my_location = Location::new(0.5);
let adjustment = topology_manager.adjust_topology(
&neighbor_locations,
&Some(my_location),
Instant::now(),
5, );
match adjustment {
TopologyAdjustment::AddConnections(locations) => {
assert_eq!(locations.len(), 20);
let short_count = locations
.iter()
.filter(|loc| my_location.distance(**loc).as_f64() < 0.1)
.count();
assert!(
short_count > locations.len() / 3,
"Expected short-distance bias, got {short_count}/{} close",
locations.len()
);
}
TopologyAdjustment::RemoveConnections(_)
| TopologyAdjustment::NoChange
| TopologyAdjustment::SwapConnection { .. } => {
panic!("Expected AddConnections, got {adjustment:?}")
}
}
}
#[test_log::test]
fn test_single_bootstrap_target() {
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(1000.0),
max_downstream_bandwidth: Rate::new_per_second(1000.0),
max_connections: 200,
min_connections: 5,
};
let mut topology_manager = TopologyManager::new(limits);
let mut neighbor_locations = BTreeMap::new();
for _ in 0..4 {
let peer = PeerKeyLocation::random();
neighbor_locations.insert(peer.location().unwrap(), vec![]);
}
let my_location = Location::new(0.25);
let adjustment = topology_manager.adjust_topology(
&neighbor_locations,
&Some(my_location),
Instant::now(),
4,
);
match adjustment {
TopologyAdjustment::AddConnections(locations) => {
assert_eq!(locations.len(), 1);
assert_eq!(locations[0], my_location);
}
TopologyAdjustment::RemoveConnections(_)
| TopologyAdjustment::NoChange
| TopologyAdjustment::SwapConnection { .. } => {
panic!("Expected AddConnections, got {adjustment:?}")
}
}
}
#[test_log::test]
fn test_bootstrap_targets_wrap_near_boundary() {
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(1000.0),
max_downstream_bandwidth: Rate::new_per_second(1000.0),
max_connections: 200,
min_connections: 5,
};
let mut topology_manager = TopologyManager::new(limits);
let mut neighbor_locations = BTreeMap::new();
let peer = PeerKeyLocation::random();
neighbor_locations.insert(peer.location().unwrap(), vec![]);
let my_location = Location::new(0.9);
let adjustment = topology_manager.adjust_topology(
&neighbor_locations,
&Some(my_location),
Instant::now(),
1,
);
match adjustment {
TopologyAdjustment::AddConnections(locations) => {
for loc in &locations {
let v = loc.as_f64();
assert!(
(0.0..1.0).contains(&v),
"Location {v} outside valid ring range [0, 1)"
);
}
let as_set: std::collections::BTreeSet<Location> =
locations.iter().copied().collect();
assert_eq!(
as_set.len(),
locations.len(),
"All bootstrap targets must be distinct even when wrapping"
);
}
TopologyAdjustment::RemoveConnections(_)
| TopologyAdjustment::NoChange
| TopologyAdjustment::SwapConnection { .. } => {
panic!("Expected AddConnections, got {adjustment:?}")
}
}
}
#[test_log::test]
fn test_no_location_falls_back_to_random() {
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(1000.0),
max_downstream_bandwidth: Rate::new_per_second(1000.0),
max_connections: 200,
min_connections: 25,
};
let mut topology_manager = TopologyManager::new(limits);
let mut neighbor_locations = BTreeMap::new();
let peer = PeerKeyLocation::random();
neighbor_locations.insert(peer.location().unwrap(), vec![]);
let adjustment =
topology_manager.adjust_topology(&neighbor_locations, &None, Instant::now(), 1);
match adjustment {
TopologyAdjustment::AddConnections(locations) => {
assert_eq!(locations.len(), 24);
let unique_locations: std::collections::HashSet<_> = locations.iter().collect();
assert!(
unique_locations.len() > 1,
"Random fallback should produce diverse locations, got {} unique out of {}",
unique_locations.len(),
locations.len()
);
}
TopologyAdjustment::RemoveConnections(_)
| TopologyAdjustment::NoChange
| TopologyAdjustment::SwapConnection { .. } => {
panic!("Expected AddConnections, got {adjustment:?}")
}
}
}
#[test]
fn test_bootstrap_zero_connections_with_location() {
let my_loc = Location::new(0.4);
let locations = bootstrap_target_locations(&Some(my_loc), 0, 5);
assert_eq!(
locations.len(),
1,
"Zero connections should produce exactly one target"
);
assert_eq!(locations[0], my_loc, "Should target own location");
}
#[test]
fn test_bootstrap_zero_connections_without_location() {
let locations = bootstrap_target_locations(&None, 0, 5);
assert_eq!(
locations.len(),
1,
"Zero connections should produce exactly one target"
);
}
#[test]
fn test_bootstrap_few_connections_with_location() {
let my_loc = Location::new(0.2);
let needed = 4;
let locations = bootstrap_target_locations(&Some(my_loc), 1, needed);
assert_eq!(locations.len(), needed);
assert_eq!(locations[0], my_loc);
let unique: std::collections::HashSet<_> = locations.iter().collect();
assert_eq!(
unique.len(),
needed,
"All bootstrap targets should be distinct"
);
for (i, loc) in locations.iter().enumerate().skip(1) {
let expected_offset = i as f64 / needed as f64;
let expected = Location::new_rounded(my_loc.as_f64() + expected_offset);
assert_eq!(*loc, expected, "Target {i} should be evenly spaced");
}
}
#[test]
fn test_bootstrap_few_connections_without_location() {
let needed = 3;
let locations = bootstrap_target_locations(&None, 2, needed);
assert_eq!(locations.len(), needed);
let unique: std::collections::HashSet<_> = locations.iter().collect();
assert!(
unique.len() > 1,
"Random locations should generally be distinct"
);
}
#[test_log::test]
fn test_topology_swap_triggers_on_clustered_connections() {
let _guard = crate::config::GlobalRng::seed_guard(0xABCD_1234);
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(100000.0),
max_downstream_bandwidth: Rate::new_per_second(100000.0),
max_connections: 200,
min_connections: 10,
};
let mut tm = TopologyManager::new(limits);
let my_location = Location::new(0.5);
let mut neighbor_locations: BTreeMap<Location, Vec<Connection>> = BTreeMap::new();
for i in 0..10 {
let offset = 0.01 + (i as f64 * 0.001);
let loc = Location::new(my_location.as_f64() + offset);
let peer = PeerKeyLocation::random();
tm.outbound_request_counter.record_request(peer.clone());
neighbor_locations
.entry(loc)
.or_default()
.push(Connection::new(peer));
}
let mut swap_count = 0;
let trials = 100;
for _ in 0..trials {
let adjustment = tm.maybe_swap_connection(&Some(my_location), &neighbor_locations, 10);
if matches!(adjustment, TopologyAdjustment::SwapConnection { .. }) {
swap_count += 1;
}
}
assert!(
swap_count > 0,
"Expected at least one swap with clustered connections, got 0/{trials}"
);
assert!(
swap_count < trials / 2,
"Too many swaps ({swap_count}/{trials}) — probability should be capped"
);
}
#[test_log::test]
fn test_topology_swap_no_trigger_when_well_distributed() {
let _guard = crate::config::GlobalRng::seed_guard(0x600D_7090);
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(100000.0),
max_downstream_bandwidth: Rate::new_per_second(100000.0),
max_connections: 200,
min_connections: 10,
};
let mut tm = TopologyManager::new(limits);
let my_location = Location::new(0.5);
let d_at = |u: f64| 0.001_f64 * (0.5_f64 / 0.001).powf(u); let mut neighbor_locations: BTreeMap<Location, Vec<Connection>> = BTreeMap::new();
for i in 0..10 {
let u = (i as f64 + 0.5) / 5.0; let dist = d_at(u.min(0.999)); let sign = if i % 2 == 0 { 1.0 } else { -1.0 };
let loc = Location::new_rounded(my_location.as_f64() + sign * dist);
let peer = PeerKeyLocation::random();
tm.outbound_request_counter.record_request(peer.clone());
neighbor_locations
.entry(loc)
.or_default()
.push(Connection::new(peer));
}
let mut swap_count = 0;
let trials = 100;
for _ in 0..trials {
let adjustment = tm.maybe_swap_connection(&Some(my_location), &neighbor_locations, 10);
if matches!(adjustment, TopologyAdjustment::SwapConnection { .. }) {
swap_count += 1;
}
}
assert!(
swap_count <= 3,
"Well-distributed topology should rarely trigger swaps, got {swap_count}/{trials}"
);
}
#[test_log::test]
fn test_topology_swap_removes_least_valuable_composite() {
let _guard = crate::config::GlobalRng::seed_guard(0x1EA5_70FE);
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(100000.0),
max_downstream_bandwidth: Rate::new_per_second(100000.0),
max_connections: 200,
min_connections: 5,
};
let mut tm = TopologyManager::new(limits);
let my_location = Location::new(0.5);
let mut neighbor_locations: BTreeMap<Location, Vec<Connection>> = BTreeMap::new();
let mut least_routed_peer = None;
for i in 0..6 {
let loc = Location::new(my_location.as_f64() + 0.01 + (i as f64 * 0.001));
let peer = PeerKeyLocation::random();
if i == 3 {
least_routed_peer = Some(peer.clone());
} else {
for _ in 0..50 {
tm.outbound_request_counter.record_request(peer.clone());
}
}
neighbor_locations
.entry(loc)
.or_default()
.push(Connection::new(peer));
}
let expected_drop = least_routed_peer.unwrap();
let mut found_swap = false;
for _ in 0..200 {
let adjustment = tm.maybe_swap_connection(&Some(my_location), &neighbor_locations, 6);
if let TopologyAdjustment::SwapConnection { remove, .. } = adjustment {
assert_eq!(
remove, expected_drop,
"Should drop the peer with lowest composite score"
);
found_swap = true;
break;
}
}
assert!(found_swap, "Should have triggered a swap within 200 trials");
}
#[test_log::test]
fn test_topology_swap_works_at_exactly_min_connections() {
let _guard = crate::config::GlobalRng::seed_guard(0xDEAD_BEEF);
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(100000.0),
max_downstream_bandwidth: Rate::new_per_second(100000.0),
max_connections: 200,
min_connections: 10,
};
let mut tm = TopologyManager::new(limits);
let my_location = Location::new(0.5);
let mut neighbor_locations: BTreeMap<Location, Vec<Connection>> = BTreeMap::new();
for i in 0..10 {
let offset = 0.01 + (i as f64 * 0.001);
let loc = Location::new(my_location.as_f64() + offset);
let peer = PeerKeyLocation::random();
tm.outbound_request_counter.record_request(peer.clone());
neighbor_locations
.entry(loc)
.or_default()
.push(Connection::new(peer));
}
let mut swap_count = 0;
let trials = 100;
for _ in 0..trials {
let adjustment = tm.maybe_swap_connection(&Some(my_location), &neighbor_locations, 10);
if matches!(adjustment, TopologyAdjustment::SwapConnection { .. }) {
swap_count += 1;
}
}
assert!(
swap_count > 0,
"Swaps must work at exactly min_connections (got 0/{trials})"
);
let mut swap_below = 0;
for _ in 0..trials {
let adjustment = tm.maybe_swap_connection(&Some(my_location), &neighbor_locations, 9);
if matches!(adjustment, TopologyAdjustment::SwapConnection { .. }) {
swap_below += 1;
}
}
assert_eq!(
swap_below, 0,
"Swaps must not fire below min_connections (got {swap_below}/{trials})"
);
}
#[test_log::test]
fn test_topology_protection_prevents_pruning_critical_connection() {
let _guard = crate::config::GlobalRng::seed_guard(0x7090_CAFE);
let mut resource_manager = setup_topology_manager(1000.0);
let my_location = Location::new(0.5);
let mut peers = Vec::new();
let mut neighbor_locations: BTreeMap<Location, Vec<Connection>> = BTreeMap::new();
for i in 0..5 {
let loc = Location::new(my_location.as_f64() + 0.01 + (i as f64 * 0.002));
let peer = PeerKeyLocation::random();
neighbor_locations
.entry(loc)
.or_default()
.push(Connection::new(peer.clone()));
peers.push(peer);
}
let isolated_loc = Location::new(my_location.as_f64() + 0.4);
let isolated_peer = PeerKeyLocation::random();
neighbor_locations
.entry(isolated_loc)
.or_default()
.push(Connection::new(isolated_peer.clone()));
peers.push(isolated_peer.clone());
for i in 0..3 {
let loc = Location::new_rounded(my_location.as_f64() - 0.05 - (i as f64 * 0.1));
let peer = PeerKeyLocation::random();
neighbor_locations
.entry(loc)
.or_default()
.push(Connection::new(peer.clone()));
peers.push(peer);
}
let report_time = Instant::now() - SOURCE_RAMP_UP_DURATION - Duration::from_secs(30);
let bw_usage = vec![2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000];
report_resource_usage(&mut resource_manager, &peers, &bw_usage, report_time);
let requests = vec![10, 10, 10, 10, 10, 0, 10, 10, 10];
report_outbound_requests(&mut resource_manager, &peers, &requests);
let adjustment = resource_manager.adjust_topology(
&neighbor_locations,
&Some(my_location),
Instant::now(),
peers.len(),
);
match &adjustment {
TopologyAdjustment::RemoveConnections(removed) => {
assert_eq!(removed.len(), 1);
assert_ne!(
removed[0], isolated_peer,
"Should not prune topology-critical peer (isolated at long distance)"
);
}
other @ TopologyAdjustment::AddConnections(_)
| other @ TopologyAdjustment::SwapConnection { .. }
| other @ TopologyAdjustment::NoChange => {
panic!("Expected RemoveConnections under resource pressure, got {other:?}");
}
}
}
#[test]
fn test_topology_value_few_connections_returns_default() {
assert_eq!(topology_value(0.1, &[0.1, 0.2], 1), Some(1.0));
assert_eq!(topology_value(0.1, &[0.1], 1), Some(1.0));
assert_eq!(topology_value(0.1, &[], 1), Some(1.0));
}
#[test]
fn test_topology_value_collocated_peers_returns_zero() {
let distances = [0.01, 0.05, 0.1, 0.2];
assert_eq!(topology_value(0.01, &distances, 2), Some(0.0));
assert_eq!(topology_value(0.1, &distances, 3), Some(0.0));
}
#[test]
fn test_topology_value_critical_peer_returns_none() {
let d_at = |u: f64| 0.001_f64 * 500.0_f64.powf(u);
let distances = [d_at(0.2), d_at(0.8), -d_at(0.2), -d_at(0.8)];
assert!(topology_value(d_at(0.2), &distances, 1).is_some());
let distances2 = [
d_at(0.01),
d_at(0.02),
d_at(0.03),
d_at(0.95),
-d_at(0.1),
-d_at(0.3),
-d_at(0.6),
-d_at(0.9),
];
assert!(topology_value(d_at(0.95), &distances2, 1).is_none());
}
#[test]
fn test_composite_score_returns_none_for_critical() {
let d_at = |u: f64| 0.001_f64 * 500.0_f64.powf(u);
let distances = [
d_at(0.01),
d_at(0.02),
d_at(0.03),
d_at(0.95),
-d_at(0.1),
-d_at(0.3),
-d_at(0.6),
-d_at(0.9),
];
assert!(composite_score(1.0, d_at(0.95), &distances, 1).is_none());
assert!(composite_score(0.0, d_at(0.95), &distances, 1).is_none());
}
#[test]
fn test_composite_score_balances_routing_and_topology() {
let d_at = |u: f64| 0.001_f64 * 500.0_f64.powf(u);
let distances = [d_at(0.2), d_at(0.4), d_at(0.6), d_at(0.8)];
let score_high_routing = composite_score(1.0, d_at(0.4), &distances, 1).unwrap();
let score_low_routing = composite_score(0.0, d_at(0.4), &distances, 1).unwrap();
assert!(
score_high_routing > score_low_routing,
"High routing ({score_high_routing}) should score higher than low ({score_low_routing})"
);
let diff = score_high_routing - score_low_routing;
assert!(
diff <= 1.0 + f64::EPSILON,
"Routing component should be at most 1.0, got diff={diff}"
);
}
#[test]
fn test_fallback_peer_to_drop_uses_topology() {
let my_loc = Location::new(0.5);
let mut neighbor_locations = BTreeMap::new();
let close_loc_1 = Location::new(0.51);
let close_loc_2 = Location::new(0.512);
let close_peer_1 = PeerKeyLocation::random();
let close_peer_2 = PeerKeyLocation::random();
let far_loc = Location::new(0.9);
let far_peer = PeerKeyLocation::random();
neighbor_locations.insert(close_loc_1, vec![Connection::new(close_peer_1.clone())]);
neighbor_locations.insert(close_loc_2, vec![Connection::new(close_peer_2.clone())]);
neighbor_locations.insert(far_loc, vec![Connection::new(far_peer.clone())]);
let dropped = select_fallback_peer_to_drop(&neighbor_locations, &Some(my_loc));
let dropped = dropped.expect("Should select a peer to drop");
assert_ne!(
dropped, far_peer,
"Should NOT drop the distant peer (topology-critical unique long-range link)"
);
}
#[test_log::test]
fn test_low_usage_cap_prevents_excessive_connects() {
let limits = Limits {
max_upstream_bandwidth: Rate::new_per_second(100000.0),
max_downstream_bandwidth: Rate::new_per_second(1000.0),
max_connections: 200,
min_connections: 10,
};
let mut resource_manager = TopologyManager::new(limits);
let peers = generate_random_peers(20);
let bw_usage_by_peer: Vec<usize> = vec![5; 20];
let report_time = Instant::now() - SOURCE_RAMP_UP_DURATION - Duration::from_secs(30);
report_resource_usage(
&mut resource_manager,
&peers,
&bw_usage_by_peer,
report_time,
);
let requests_per_peer: Vec<usize> = vec![1; 20];
report_outbound_requests(&mut resource_manager, &peers, &requests_per_peer);
let mut neighbor_locations = BTreeMap::new();
for peer in &peers {
neighbor_locations.insert(peer.location().unwrap(), vec![]);
}
let adjustment =
resource_manager.adjust_topology(&neighbor_locations, &None, Instant::now(), 15);
assert!(
matches!(adjustment, TopologyAdjustment::AddConnections(_)),
"Should add connections when below low-usage cap, got {adjustment:?}"
);
let adjustment =
resource_manager.adjust_topology(&neighbor_locations, &None, Instant::now(), 20);
assert!(
matches!(adjustment, TopologyAdjustment::NoChange),
"Should not add connections at low-usage cap, got {adjustment:?}"
);
let adjustment =
resource_manager.adjust_topology(&neighbor_locations, &None, Instant::now(), 25);
assert!(
matches!(adjustment, TopologyAdjustment::NoChange),
"Should not add connections above low-usage cap, got {adjustment:?}"
);
}
}
#[derive(Debug, Clone)]
pub(crate) enum TopologyAdjustment {
AddConnections(Vec<Location>),
RemoveConnections(Vec<PeerKeyLocation>),
SwapConnection {
remove: PeerKeyLocation,
add_location: Location,
},
NoChange,
}
struct Usage {
total: Rate,
#[allow(unused)]
per_source: BTreeMap<AttributionSource, Rate>,
}
pub(crate) struct Limits {
pub max_upstream_bandwidth: Rate,
pub max_downstream_bandwidth: Rate,
pub min_connections: usize,
pub max_connections: usize,
}
impl Limits {
pub fn get(&self, resource_type: &ResourceType) -> Rate {
match resource_type {
ResourceType::OutboundBandwidthBytes => self.max_upstream_bandwidth,
ResourceType::InboundBandwidthBytes => self.max_downstream_bandwidth,
}
}
}