use dashmap::DashMap;
use freenet_stdlib::prelude::{ContractInstanceId, ContractKey};
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
net::SocketAddr,
sync::LazyLock,
};
static TOPOLOGY_REGISTRY: LazyLock<DashMap<(String, SocketAddr), TopologySnapshot>> =
LazyLock::new(DashMap::new);
thread_local! {
static CURRENT_NETWORK_NAME: RefCell<Option<String>> = const { RefCell::new(None) };
}
pub fn set_current_network_name(name: &str) {
CURRENT_NETWORK_NAME.with(|n| *n.borrow_mut() = Some(name.to_string()));
}
pub fn get_current_network_name() -> Option<String> {
CURRENT_NETWORK_NAME.with(|n| n.borrow().clone())
}
pub fn clear_current_network_name() {
CURRENT_NETWORK_NAME.with(|n| *n.borrow_mut() = None);
}
#[derive(Debug, Clone)]
pub struct ContractSubscription {
pub contract_key: ContractKey,
pub upstream: Option<SocketAddr>,
pub downstream: Vec<SocketAddr>,
pub is_hosting: bool,
pub has_client_subscriptions: bool,
}
#[derive(Debug, Clone)]
pub struct TopologySnapshot {
pub peer_addr: SocketAddr,
pub location: f64,
pub contracts: HashMap<ContractInstanceId, ContractSubscription>,
pub active_subscription_keys: HashSet<ContractInstanceId>,
pub timestamp_nanos: u64,
}
impl TopologySnapshot {
pub fn new(peer_addr: SocketAddr, location: f64) -> Self {
Self {
peer_addr,
location,
contracts: HashMap::new(),
active_subscription_keys: HashSet::new(),
timestamp_nanos: 0,
}
}
pub fn set_contract(
&mut self,
contract_id: ContractInstanceId,
subscription: ContractSubscription,
) {
self.contracts.insert(contract_id, subscription);
}
pub fn get_contract(&self, contract_id: &ContractInstanceId) -> Option<&ContractSubscription> {
self.contracts.get(contract_id)
}
pub fn has_upstream(&self, contract_id: &ContractInstanceId) -> bool {
self.contracts
.get(contract_id)
.map(|s| s.upstream.is_some())
.unwrap_or(false)
}
pub fn is_hosting(&self, contract_id: &ContractInstanceId) -> bool {
self.contracts
.get(contract_id)
.map(|s| s.is_hosting)
.unwrap_or(false)
}
}
pub fn register_topology_snapshot(network_name: &str, snapshot: TopologySnapshot) {
let key = (network_name.to_string(), snapshot.peer_addr);
TOPOLOGY_REGISTRY.insert(key, snapshot);
}
pub fn get_topology_snapshot(
network_name: &str,
peer_addr: &SocketAddr,
) -> Option<TopologySnapshot> {
let key = (network_name.to_string(), *peer_addr);
TOPOLOGY_REGISTRY.get(&key).map(|r| r.value().clone())
}
pub fn get_all_topology_snapshots(network_name: &str) -> Vec<TopologySnapshot> {
let mut snapshots: Vec<_> = TOPOLOGY_REGISTRY
.iter()
.filter(|entry| entry.key().0 == network_name)
.map(|entry| (entry.key().1, entry.value().clone()))
.collect();
snapshots.sort_by(|a, b| {
let addr_a = format!("{}", a.0);
let addr_b = format!("{}", b.0);
addr_a.cmp(&addr_b)
});
snapshots
.into_iter()
.map(|(_, snapshot)| snapshot)
.collect()
}
pub fn clear_topology_snapshots(network_name: &str) {
TOPOLOGY_REGISTRY.retain(|key, _| key.0 != network_name);
}
pub fn clear_all_topology_snapshots() {
TOPOLOGY_REGISTRY.clear();
}
#[derive(Debug, Default)]
pub struct TopologyValidationResult {
pub bidirectional_cycles: Vec<(SocketAddr, SocketAddr)>,
pub orphan_hosters: Vec<(SocketAddr, ContractInstanceId)>,
pub disconnected_upstream: Vec<(SocketAddr, ContractInstanceId)>,
pub unreachable_hosters: Vec<(SocketAddr, ContractInstanceId)>,
pub proximity_violations: Vec<ProximityViolation>,
pub issue_count: usize,
}
impl TopologyValidationResult {
pub fn is_healthy(&self) -> bool {
self.issue_count == 0
}
}
#[derive(Debug, Clone)]
pub struct ProximityViolation {
pub contract_id: ContractInstanceId,
pub downstream_addr: SocketAddr,
pub upstream_addr: SocketAddr,
pub downstream_location: f64,
pub upstream_location: f64,
pub contract_location: f64,
}
pub fn validate_topology(
network_name: &str,
contract_id: &ContractInstanceId,
contract_location: f64,
) -> TopologyValidationResult {
let snapshots = get_all_topology_snapshots(network_name);
validate_topology_from_snapshots(&snapshots, contract_id, contract_location)
}
pub fn validate_topology_from_snapshots(
snapshots: &[TopologySnapshot],
contract_id: &ContractInstanceId,
contract_location: f64,
) -> TopologyValidationResult {
let mut result = TopologyValidationResult::default();
let mut subscription_graph: HashMap<SocketAddr, (Option<SocketAddr>, Vec<SocketAddr>)> =
HashMap::new();
let mut peer_locations: HashMap<SocketAddr, f64> = HashMap::new();
let mut hosters: HashSet<SocketAddr> = HashSet::new();
for snapshot in snapshots {
peer_locations.insert(snapshot.peer_addr, snapshot.location);
if let Some(sub) = snapshot.contracts.get(contract_id) {
subscription_graph.insert(snapshot.peer_addr, (sub.upstream, sub.downstream.clone()));
if sub.is_hosting {
hosters.insert(snapshot.peer_addr);
}
}
}
for (&peer, (upstream, _)) in &subscription_graph {
if let Some(upstream_addr) = upstream {
if let Some((their_upstream, _)) = subscription_graph.get(upstream_addr) {
if their_upstream == &Some(peer) {
let pair = if peer < *upstream_addr {
(peer, *upstream_addr)
} else {
(*upstream_addr, peer)
};
if !result.bidirectional_cycles.contains(&pair) {
result.bidirectional_cycles.push(pair);
result.issue_count += 1;
}
}
}
}
}
const SOURCE_THRESHOLD: f64 = 0.05;
let has_proper_source = hosters.iter().any(|hoster| {
peer_locations
.get(hoster)
.map(|loc| ring_distance(*loc, contract_location) < SOURCE_THRESHOLD)
.unwrap_or(false)
});
let de_facto_sources: HashSet<SocketAddr> = if has_proper_source {
HashSet::new()
} else {
hosters
.iter()
.filter(|hoster| {
subscription_graph
.get(*hoster)
.map(|(upstream, downstream)| upstream.is_none() && !downstream.is_empty())
.unwrap_or(false)
})
.copied()
.collect()
};
for &hoster in &hosters {
if let Some((upstream, downstream)) = subscription_graph.get(&hoster) {
let is_proper_source = peer_locations
.get(&hoster)
.map(|loc| ring_distance(*loc, contract_location) < SOURCE_THRESHOLD)
.unwrap_or(false);
let is_de_facto_source = de_facto_sources.contains(&hoster);
let is_source = is_proper_source || is_de_facto_source;
if !is_source && upstream.is_none() && downstream.is_empty() {
result.orphan_hosters.push((hoster, *contract_id));
result.issue_count += 1;
}
if !is_source && upstream.is_none() && !downstream.is_empty() {
result.disconnected_upstream.push((hoster, *contract_id));
result.issue_count += 1;
}
}
}
for (&peer, (upstream, _)) in &subscription_graph {
if let Some(upstream_addr) = upstream {
let peer_loc = peer_locations.get(&peer).copied().unwrap_or(0.0);
let upstream_loc = peer_locations.get(upstream_addr).copied().unwrap_or(0.0);
let peer_dist = ring_distance(peer_loc, contract_location);
let upstream_dist = ring_distance(upstream_loc, contract_location);
if upstream_dist > peer_dist + 0.1 {
result.proximity_violations.push(ProximityViolation {
contract_id: *contract_id,
downstream_addr: peer,
upstream_addr: *upstream_addr,
downstream_location: peer_loc,
upstream_location: upstream_loc,
contract_location,
});
result.issue_count += 1;
}
}
}
let source_candidates: Vec<_> = peer_locations
.iter()
.filter(|(_, loc)| ring_distance(**loc, contract_location) < SOURCE_THRESHOLD)
.map(|(addr, _)| *addr)
.collect();
if !source_candidates.is_empty() {
let mut reachable: HashSet<SocketAddr> = HashSet::new();
let mut to_visit: Vec<SocketAddr> = source_candidates;
while let Some(peer) = to_visit.pop() {
if reachable.insert(peer) {
if let Some((_, downstream)) = subscription_graph.get(&peer) {
to_visit.extend(downstream.iter().copied());
}
}
}
for &hoster in &hosters {
if !reachable.contains(&hoster) {
result.unreachable_hosters.push((hoster, *contract_id));
result.issue_count += 1;
}
}
}
result
}
fn ring_distance(a: f64, b: f64) -> f64 {
let diff = (a - b).abs();
diff.min(1.0 - diff)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_contract_id(seed: u8) -> ContractInstanceId {
ContractInstanceId::new([seed; 32])
}
fn make_contract_key(seed: u8) -> ContractKey {
use freenet_stdlib::prelude::CodeHash;
ContractKey::from_id_and_code(
ContractInstanceId::new([seed; 32]),
CodeHash::new([seed.wrapping_add(1); 32]),
)
}
#[test]
fn test_bidirectional_cycle_detection() {
let network = "test-bidirectional";
clear_topology_snapshots(network);
let peer_a: SocketAddr = "10.0.1.1:5000".parse().unwrap();
let peer_b: SocketAddr = "10.0.2.1:5000".parse().unwrap();
let contract_id = make_contract_id(1);
let contract_key = make_contract_key(1);
let mut snap_a = TopologySnapshot::new(peer_a, 0.3);
snap_a.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: Some(peer_b),
downstream: vec![peer_b], is_hosting: true,
has_client_subscriptions: false,
},
);
let mut snap_b = TopologySnapshot::new(peer_b, 0.4);
snap_b.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: Some(peer_a),
downstream: vec![peer_a], is_hosting: true,
has_client_subscriptions: false,
},
);
register_topology_snapshot(network, snap_a);
register_topology_snapshot(network, snap_b);
let result = validate_topology(network, &contract_id, 0.5);
assert!(
!result.bidirectional_cycles.is_empty(),
"Should detect bidirectional cycle"
);
clear_topology_snapshots(network);
}
#[test]
fn test_orphan_hoster_detection() {
let network = "test-orphan";
clear_topology_snapshots(network);
let peer: SocketAddr = "10.0.1.1:5000".parse().unwrap();
let contract_id = make_contract_id(1);
let contract_key = make_contract_key(1);
let mut snap = TopologySnapshot::new(peer, 0.3);
snap.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![],
is_hosting: true,
has_client_subscriptions: false,
},
);
register_topology_snapshot(network, snap);
let result = validate_topology(network, &contract_id, 0.5);
assert!(
!result.orphan_hosters.is_empty(),
"Should detect orphan hoster"
);
clear_topology_snapshots(network);
}
#[test]
fn test_de_facto_source_single_hoster() {
let network = "test-de-facto-source";
clear_topology_snapshots(network);
let peer: SocketAddr = "10.0.1.1:5000".parse().unwrap();
let downstream_peer: SocketAddr = "10.0.2.1:5000".parse().unwrap();
let contract_id = make_contract_id(1);
let contract_key = make_contract_key(1);
let mut snap = TopologySnapshot::new(peer, 0.3);
snap.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![downstream_peer],
is_hosting: true,
has_client_subscriptions: false,
},
);
register_topology_snapshot(network, snap);
let result = validate_topology(network, &contract_id, 0.5);
assert!(
result.disconnected_upstream.is_empty(),
"Single hoster acting as tree root should be valid de-facto source, not disconnected"
);
assert!(result.is_healthy(), "Topology should be healthy");
clear_topology_snapshots(network);
}
#[test]
fn test_disconnected_upstream_with_proper_source() {
let network = "test-disconnected-with-source";
clear_topology_snapshots(network);
let source_peer: SocketAddr = "10.0.1.1:5000".parse().unwrap();
let disconnected_peer: SocketAddr = "10.0.2.1:5000".parse().unwrap();
let downstream_peer: SocketAddr = "10.0.3.1:5000".parse().unwrap();
let contract_id = make_contract_id(1);
let contract_key = make_contract_key(1);
let mut source_snap = TopologySnapshot::new(source_peer, 0.52);
source_snap.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![],
is_hosting: true,
has_client_subscriptions: false,
},
);
let mut disconnected_snap = TopologySnapshot::new(disconnected_peer, 0.3);
disconnected_snap.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![downstream_peer], is_hosting: true,
has_client_subscriptions: false,
},
);
register_topology_snapshot(network, source_snap);
register_topology_snapshot(network, disconnected_snap);
let result = validate_topology(network, &contract_id, 0.5);
assert!(
!result.disconnected_upstream.is_empty(),
"Should detect disconnected upstream when proper source exists"
);
clear_topology_snapshots(network);
}
#[test]
fn test_ring_distance_wrap_around() {
let network = "test-wrap-around";
clear_topology_snapshots(network);
let peer: SocketAddr = "10.0.1.1:5000".parse().unwrap();
let contract_id = make_contract_id(1);
let contract_key = make_contract_key(1);
let mut snap = TopologySnapshot::new(peer, 0.99);
snap.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![],
is_hosting: true,
has_client_subscriptions: false,
},
);
register_topology_snapshot(network, snap);
let result = validate_topology(network, &contract_id, 0.02);
assert!(
result.orphan_hosters.is_empty(),
"Peer at 0.99 should be considered source for contract at 0.02 (ring distance 0.03)"
);
assert!(
result.disconnected_upstream.is_empty(),
"Peer at 0.99 should be considered source, not disconnected upstream"
);
clear_topology_snapshots(network);
}
#[test]
fn test_source_hoster_not_orphan() {
let network = "test-source-not-orphan";
clear_topology_snapshots(network);
let peer: SocketAddr = "10.0.1.1:5000".parse().unwrap();
let contract_id = make_contract_id(1);
let contract_key = make_contract_key(1);
let mut snap = TopologySnapshot::new(peer, 0.5);
snap.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![], is_hosting: true,
has_client_subscriptions: false,
},
);
register_topology_snapshot(network, snap);
let result = validate_topology(network, &contract_id, 0.5);
assert!(
result.orphan_hosters.is_empty(),
"Source hoster should not be flagged as orphan"
);
clear_topology_snapshots(network);
}
#[test]
fn test_proximity_violation_detection() {
let network = "test-proximity-violation";
clear_topology_snapshots(network);
let peer: SocketAddr = "10.0.1.1:5000".parse().unwrap();
let upstream_peer: SocketAddr = "10.0.2.1:5000".parse().unwrap();
let contract_id = make_contract_id(1);
let contract_key = make_contract_key(1);
let mut snap_downstream = TopologySnapshot::new(peer, 0.45);
snap_downstream.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: Some(upstream_peer),
downstream: vec![],
is_hosting: true,
has_client_subscriptions: false,
},
);
let mut snap_upstream = TopologySnapshot::new(upstream_peer, 0.8);
snap_upstream.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![peer],
is_hosting: true,
has_client_subscriptions: false,
},
);
register_topology_snapshot(network, snap_downstream);
register_topology_snapshot(network, snap_upstream);
let result = validate_topology(network, &contract_id, 0.5);
assert!(
!result.proximity_violations.is_empty(),
"Should detect proximity violation when upstream is farther from contract than downstream"
);
let violation = &result.proximity_violations[0];
assert_eq!(violation.downstream_addr, peer);
assert_eq!(violation.upstream_addr, upstream_peer);
clear_topology_snapshots(network);
}
#[test]
fn test_unreachable_hoster_detection() {
let network = "test-unreachable";
clear_topology_snapshots(network);
let source_peer: SocketAddr = "10.0.1.1:5000".parse().unwrap();
let reachable_peer: SocketAddr = "10.0.2.1:5000".parse().unwrap();
let unreachable_peer: SocketAddr = "10.0.3.1:5000".parse().unwrap();
let contract_id = make_contract_id(1);
let contract_key = make_contract_key(1);
let mut snap_source = TopologySnapshot::new(source_peer, 0.5);
snap_source.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![reachable_peer],
is_hosting: true,
has_client_subscriptions: false,
},
);
let mut snap_reachable = TopologySnapshot::new(reachable_peer, 0.6);
snap_reachable.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: Some(source_peer),
downstream: vec![],
is_hosting: true,
has_client_subscriptions: false,
},
);
let mut snap_unreachable = TopologySnapshot::new(unreachable_peer, 0.7);
snap_unreachable.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None, downstream: vec![],
is_hosting: true,
has_client_subscriptions: false,
},
);
register_topology_snapshot(network, snap_source);
register_topology_snapshot(network, snap_reachable);
register_topology_snapshot(network, snap_unreachable);
let result = validate_topology(network, &contract_id, 0.5);
assert!(
!result.unreachable_hosters.is_empty(),
"Should detect unreachable hoster"
);
assert!(
result
.unreachable_hosters
.iter()
.any(|(addr, _)| *addr == unreachable_peer),
"Unreachable peer should be in the list"
);
clear_topology_snapshots(network);
}
#[test]
fn test_healthy_topology_no_issues() {
let network = "test-healthy";
clear_topology_snapshots(network);
let source_peer: SocketAddr = "10.0.1.1:5000".parse().unwrap();
let downstream_peer: SocketAddr = "10.0.2.1:5000".parse().unwrap();
let contract_id = make_contract_id(1);
let contract_key = make_contract_key(1);
let mut snap_source = TopologySnapshot::new(source_peer, 0.5);
snap_source.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![downstream_peer],
is_hosting: true,
has_client_subscriptions: false,
},
);
let mut snap_downstream = TopologySnapshot::new(downstream_peer, 0.6);
snap_downstream.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: Some(source_peer),
downstream: vec![],
is_hosting: true,
has_client_subscriptions: false,
},
);
register_topology_snapshot(network, snap_source);
register_topology_snapshot(network, snap_downstream);
let result = validate_topology(network, &contract_id, 0.5);
assert!(
result.is_healthy(),
"Healthy topology should have no issues, got: cycles={}, orphans={}, disconnected={}, unreachable={}, proximity={}",
result.bidirectional_cycles.len(),
result.orphan_hosters.len(),
result.disconnected_upstream.len(),
result.unreachable_hosters.len(),
result.proximity_violations.len()
);
clear_topology_snapshots(network);
}
#[test]
fn test_mutual_downstream_detection_with_proper_source_issue_2773() {
let network = "test-mutual-downstream-with-source";
clear_topology_snapshots(network);
let source_peer: SocketAddr = "10.0.0.1:5000".parse().unwrap();
let peer_a: SocketAddr = "10.0.1.1:5000".parse().unwrap();
let peer_b: SocketAddr = "10.0.2.1:5000".parse().unwrap();
let contract_id = make_contract_id(1);
let contract_key = make_contract_key(1);
let mut snap_source = TopologySnapshot::new(source_peer, 0.51);
snap_source.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![], is_hosting: true,
has_client_subscriptions: false,
},
);
let mut snap_a = TopologySnapshot::new(peer_a, 0.2);
snap_a.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None, downstream: vec![peer_b], is_hosting: true,
has_client_subscriptions: false,
},
);
let mut snap_b = TopologySnapshot::new(peer_b, 0.3);
snap_b.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None, downstream: vec![peer_a], is_hosting: true,
has_client_subscriptions: false,
},
);
register_topology_snapshot(network, snap_source);
register_topology_snapshot(network, snap_a);
register_topology_snapshot(network, snap_b);
let result = validate_topology(network, &contract_id, 0.5);
assert_eq!(
result.disconnected_upstream.len(),
2,
"Both peers should be flagged as disconnected upstream when proper source exists, found: {:?}",
result.disconnected_upstream
);
let disconnected_addrs: Vec<_> = result
.disconnected_upstream
.iter()
.map(|(a, _)| *a)
.collect();
assert!(
disconnected_addrs.contains(&peer_a),
"Peer A should be in disconnected_upstream"
);
assert!(
disconnected_addrs.contains(&peer_b),
"Peer B should be in disconnected_upstream"
);
assert!(
result.bidirectional_cycles.is_empty(),
"Mutual downstream is not a bidirectional cycle (no upstream established)"
);
assert!(
!result.is_healthy(),
"Mutual downstream should be flagged as unhealthy"
);
clear_topology_snapshots(network);
}
#[test]
fn test_mutual_downstream_without_source_not_detected_limitation() {
let network = "test-mutual-downstream-no-source";
clear_topology_snapshots(network);
let peer_a: SocketAddr = "10.0.1.1:5000".parse().unwrap();
let peer_b: SocketAddr = "10.0.2.1:5000".parse().unwrap();
let contract_id = make_contract_id(1);
let contract_key = make_contract_key(1);
let mut snap_a = TopologySnapshot::new(peer_a, 0.2);
snap_a.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![peer_b],
is_hosting: true,
has_client_subscriptions: false,
},
);
let mut snap_b = TopologySnapshot::new(peer_b, 0.3);
snap_b.set_contract(
contract_id,
ContractSubscription {
contract_key,
upstream: None,
downstream: vec![peer_a],
is_hosting: true,
has_client_subscriptions: false,
},
);
register_topology_snapshot(network, snap_a);
register_topology_snapshot(network, snap_b);
let result = validate_topology(network, &contract_id, 0.5);
assert!(
result.disconnected_upstream.is_empty(),
"KNOWN LIMITATION: Mutual downstream without proper source is not detected"
);
assert!(
result.is_healthy(),
"KNOWN LIMITATION: Mutual downstream appears healthy when no source exists"
);
clear_topology_snapshots(network);
}
}