use std::{
collections::HashMap,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
thread,
};
use scion_proto::address::IsdAsn;
use crate::network::scion::topology::{FastTopologyLookup, ScionAs, ScionLink};
pub trait TopologyLinkVisitor: Clone + Send {
type Output: Send;
fn visit(&mut self, used_link: Option<&ScionLink>, current_as: &ScionAs);
fn finish(self, final_link: bool) -> Option<Self::Output>;
#[expect(unused_variables)]
fn should_follow_link(
&self,
current_as: &ScionAs,
next_link: &ScionLink,
next_as: &ScionAs,
) -> bool {
true
}
}
pub fn walk_all_links<'topo, Visitor: TopologyLinkVisitor>(
visitor: Visitor,
start_as: IsdAsn,
topo_lookup: &FastTopologyLookup<'topo>,
) -> Vec<Visitor::Output> {
walk_all_links_parallel(visitor, start_as, topo_lookup, 1)
}
pub fn walk_all_links_parallel<'topo, Visitor: TopologyLinkVisitor>(
visitor: Visitor,
start_as: IsdAsn,
topo_lookup: &FastTopologyLookup<'topo>,
max_threads: usize,
) -> Vec<Visitor::Output> {
let max_extra_threads = max_threads.max(1) - 1;
let Some(start_as) = topo_lookup.topology.as_map.get(&start_as) else {
return vec![];
};
let index = Arc::new(BitsetIndex::new(
topo_lookup.topology.as_map.keys().copied(),
));
let visited = VisitedBitset::new(index);
let thread_budget = Arc::new(ThreadBudget::new(max_extra_threads));
let mut results = Vec::new();
visit_recurse(
start_as,
None,
visitor,
visited,
&mut results,
topo_lookup,
max_extra_threads,
&thread_budget,
);
results
}
struct Branch<'topo, V: TopologyLinkVisitor> {
next_as: &'topo ScionAs,
link: &'topo ScionLink,
visitor: V,
}
fn visit_recurse<'topo, Visitor: TopologyLinkVisitor>(
current_as: &'topo ScionAs,
used_link: Option<&ScionLink>,
mut visitor: Visitor,
mut visited: VisitedBitset<4>,
result_collector: &mut Vec<Visitor::Output>,
topo_lookup: &FastTopologyLookup<'topo>,
max_threads: usize,
thread_budget: &Arc<ThreadBudget>,
) {
let current_as_id = current_as.isd_as();
if !visited.insert(current_as_id) {
return; }
visitor.visit(used_link, current_as);
let empty_vec = Vec::new();
let links = topo_lookup
.as_to_link_map
.get(¤t_as_id)
.unwrap_or(&empty_vec);
let mut branches: Vec<Branch<'topo, Visitor>> = Vec::new();
for link in links {
if Some(*link) == used_link {
continue;
}
let Some(next_interface) = link.get_peer(¤t_as_id) else {
debug_assert!(false, "Link {link} has no peer for AS {current_as:?}");
continue; };
let Some(next_as) = topo_lookup.topology.as_map.get(&next_interface.isd_as) else {
debug_assert!(false, "Missing as in topology: {next_interface:?}");
continue; };
if visitor.should_follow_link(current_as, link, next_as) {
if visited.contains(next_as.isd_as()) {
continue;
}
branches.push(Branch {
next_as,
link,
visitor: visitor.clone(),
});
}
}
let has_branched = !branches.is_empty();
const MIN_BRANCHES_FOR_THREADING: usize = 3;
const MIN_UNVISITED_FOR_THREADING: usize = 5;
let ok_unvisited = visited.unvisited_count() > MIN_UNVISITED_FOR_THREADING;
let ok_branches = branches.len() >= MIN_BRANCHES_FOR_THREADING;
let may_multithread = max_threads > 1 && ok_unvisited && ok_branches;
match may_multithread {
false => {
for branch in branches {
visit_recurse(
branch.next_as,
Some(branch.link),
branch.visitor,
visited.clone(),
result_collector,
topo_lookup,
max_threads,
thread_budget,
);
}
}
true => {
thread::scope(|scope| {
let mut handles = Vec::new();
for branch in branches {
let permit = thread_budget.try_acquire();
match permit {
Ok(permit) => {
let visited_clone = visited.clone();
let mut result_collector = Vec::new();
let active = Arc::clone(thread_budget);
handles.push(scope.spawn(move || {
visit_recurse(
branch.next_as,
Some(branch.link),
branch.visitor,
visited_clone,
&mut result_collector,
topo_lookup,
max_threads,
&active,
);
drop(permit); result_collector
}));
}
Err(_) => {
visit_recurse(
branch.next_as,
Some(branch.link),
branch.visitor,
visited.clone(),
result_collector,
topo_lookup,
max_threads,
thread_budget,
);
}
}
}
for handle in handles {
let thread_results = handle.join().expect("visitor thread panicked");
result_collector.extend(thread_results);
}
})
}
}
result_collector.extend(visitor.finish(!has_branched));
}
struct BitsetIndex {
map: HashMap<IsdAsn, u64>,
}
impl BitsetIndex {
fn new(ases: impl Iterator<Item = IsdAsn>) -> Self {
let map = ases
.enumerate()
.map(|(i, as_id)| (as_id, i as u64))
.collect();
Self { map }
}
fn get(&self, as_id: IsdAsn) -> Option<u64> {
self.map.get(&as_id).copied()
}
}
#[derive(Clone)]
struct VisitedBitset<const N: usize> {
index: Arc<BitsetIndex>,
visited: [u128; N],
}
impl<const N: usize> VisitedBitset<N> {
fn new(index: Arc<BitsetIndex>) -> Self {
if index.map.len() > N * 128 {
panic!(
"Topology has more than {} ASes, this is currently not supported",
N * 128
);
}
Self {
index,
visited: [0; N],
}
}
fn unvisited_count(&self) -> usize {
let total_ases = self.index.map.len();
let visited_count = self
.visited
.iter()
.map(|bits| bits.count_ones() as usize)
.sum::<usize>();
total_ases - visited_count
}
fn contains(&self, as_id: IsdAsn) -> bool {
let Some(bit_index) = self.index.get(as_id) else {
return false;
};
let array_index = (bit_index / 128) as usize;
let bit_position = bit_index % 128;
if array_index >= self.visited.len() {
return false;
}
self.visited[array_index] & (1 << bit_position) != 0
}
fn insert(&mut self, as_id: IsdAsn) -> bool {
let Some(bit_index) = self.index.get(as_id) else {
panic!("AS {} not found in BitsetIndex", as_id);
};
let array_index = (bit_index / 128) as usize;
let bit_position = bit_index % 128;
if array_index >= self.visited.len() {
panic!(
"Bit index {} for AS {} exceeds visited bitset size",
bit_index, as_id
);
}
let bit = 1 << bit_position;
let already_visited = self.visited[array_index] & bit != 0;
self.visited[array_index] |= bit;
!already_visited
}
}
struct ThreadBudget {
available: AtomicUsize,
}
impl ThreadBudget {
fn new(permits: usize) -> Self {
Self {
available: AtomicUsize::new(permits),
}
}
fn try_acquire(&self) -> Result<ThreadPermit<'_>, ()> {
let mut current = self.available.load(Ordering::Relaxed);
loop {
if current == 0 {
return Err(());
}
match self.available.compare_exchange_weak(
current,
current - 1,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return Ok(ThreadPermit { budget: self }),
Err(updated) => current = updated,
}
}
}
}
struct ThreadPermit<'a> {
budget: &'a ThreadBudget,
}
impl Drop for ThreadPermit<'_> {
fn drop(&mut self) {
self.budget.available.fetch_add(1, Ordering::Release);
}
}