use std::collections::BTreeMap;
use std::time::Duration;
use crate::adapter::net::behavior::{
capability::CapabilityIndex, predicate::EvalContext, predicate::Predicate, tag::Tag,
tag::TagKey, tag::TaxonomyAxis,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EdgeKind {
ForkOfParent,
}
impl EdgeKind {
pub fn prefix(&self) -> &'static str {
match self {
EdgeKind::ForkOfParent => "fork-of:",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Distance(pub Duration);
pub trait Aggregator {
type Output;
fn observe(
&mut self,
node_id: u64,
caps: &crate::adapter::net::behavior::capability::CapabilitySet,
);
fn finalize(self) -> Self::Output;
}
#[derive(Default)]
pub struct Count(usize);
impl Aggregator for Count {
type Output = usize;
fn observe(&mut self, _: u64, _: &crate::adapter::net::behavior::capability::CapabilitySet) {
self.0 += 1;
}
fn finalize(self) -> usize {
self.0
}
}
pub struct UniqueAxisValues {
axis: TaxonomyAxis,
key: String,
seen: std::collections::BTreeSet<String>,
}
impl UniqueAxisValues {
pub fn new(axis: TaxonomyAxis, key: impl Into<String>) -> Self {
Self {
axis,
key: key.into(),
seen: std::collections::BTreeSet::new(),
}
}
}
impl Aggregator for UniqueAxisValues {
type Output = Vec<String>;
fn observe(&mut self, _: u64, caps: &crate::adapter::net::behavior::capability::CapabilitySet) {
for tag in &caps.tags {
if let Tag::AxisValue {
axis, key, value, ..
} = tag
{
if *axis == self.axis && key == &self.key {
self.seen.insert(value.clone());
}
}
}
}
fn finalize(self) -> Vec<String> {
self.seen.into_iter().collect()
}
}
pub struct MaxNumericMetadata {
key: String,
current_max: Option<f64>,
}
impl MaxNumericMetadata {
pub fn new(key: impl Into<String>) -> Self {
Self {
key: key.into(),
current_max: None,
}
}
}
impl Aggregator for MaxNumericMetadata {
type Output = Option<f64>;
fn observe(&mut self, _: u64, caps: &crate::adapter::net::behavior::capability::CapabilitySet) {
let Some(raw) = caps.metadata.get(&self.key) else {
return;
};
let Ok(parsed) = raw.parse::<f64>() else {
return;
};
if !parsed.is_finite() {
return;
}
self.current_max = Some(match self.current_max {
Some(m) if m >= parsed => m,
_ => parsed,
});
}
fn finalize(self) -> Option<f64> {
self.current_max
}
}
pub trait CapabilityQuery {
fn filter(
&self,
predicate: &Predicate,
) -> Vec<(
u64,
crate::adapter::net::behavior::capability::CapabilitySet,
)>;
fn match_axis(
&self,
axis: TaxonomyAxis,
key: &str,
value: Option<&str>,
) -> Vec<(
u64,
crate::adapter::net::behavior::capability::CapabilitySet,
)>;
fn aggregate<A>(&self, predicate: &Predicate, agg: A) -> A::Output
where
A: Aggregator;
fn traverse(
&self,
start_node: u64,
start_tag: &Tag,
edge: EdgeKind,
max_depth: u32,
) -> Vec<(u64, Tag)>;
fn nearest<F: Fn(u64) -> Option<Duration>>(
&self,
predicate: &Predicate,
rtt_lookup: F,
n: usize,
) -> Vec<(
u64,
crate::adapter::net::behavior::capability::CapabilitySet,
Option<Distance>,
)>;
}
impl CapabilityQuery for CapabilityIndex {
fn filter(
&self,
predicate: &Predicate,
) -> Vec<(
u64,
crate::adapter::net::behavior::capability::CapabilitySet,
)> {
let mut out = Vec::new();
for node_id in self.all_nodes() {
let matched: Option<crate::adapter::net::behavior::capability::CapabilitySet> = self
.with_caps(node_id, |caps| {
let owned_tags: Vec<Tag> = caps.tags.iter().cloned().collect();
let ctx = EvalContext::new(&owned_tags, &caps.metadata);
if predicate.evaluate_unplanned(&ctx) {
Some(caps.clone())
} else {
None
}
})
.flatten();
if let Some(caps) = matched {
out.push((node_id, caps));
}
}
out
}
fn match_axis(
&self,
axis: TaxonomyAxis,
key: &str,
value: Option<&str>,
) -> Vec<(
u64,
crate::adapter::net::behavior::capability::CapabilitySet,
)> {
let mut out = Vec::new();
for node_id in self.all_nodes() {
let matched: Option<crate::adapter::net::behavior::capability::CapabilitySet> = self
.with_caps(node_id, |caps| {
if axis_match(caps, axis, key, value) {
Some(caps.clone())
} else {
None
}
})
.flatten();
if let Some(caps) = matched {
out.push((node_id, caps));
}
}
out
}
fn aggregate<A>(&self, predicate: &Predicate, mut agg: A) -> A::Output
where
A: Aggregator,
{
for node_id in self.all_nodes() {
self.with_caps(node_id, |caps| {
let owned_tags: Vec<Tag> = caps.tags.iter().cloned().collect();
let ctx = EvalContext::new(&owned_tags, &caps.metadata);
if predicate.evaluate_unplanned(&ctx) {
agg.observe(node_id, caps);
}
});
}
agg.finalize()
}
fn traverse(
&self,
start_node: u64,
start_tag: &Tag,
edge: EdgeKind,
max_depth: u32,
) -> Vec<(u64, Tag)> {
let mut path = vec![(start_node, start_tag.clone())];
if max_depth == 0 {
return path;
}
let mut visited: std::collections::BTreeSet<u64> = std::collections::BTreeSet::new();
visited.insert(start_node);
let mut current_tag = start_tag.clone();
for _ in 0..max_depth {
let body = match ¤t_tag {
Tag::Reserved { prefix, body } if prefix == edge.prefix() => body.clone(),
_ => break, };
let parent_lookup_tag = Tag::Reserved {
prefix: "causal:".to_string(),
body: body.clone(),
};
let parent_host = self.find_first_host(&parent_lookup_tag);
let Some(parent_node) = parent_host else {
break; };
if !visited.insert(parent_node) {
break;
}
let next_edge: Option<Tag> = self
.with_caps(parent_node, |caps| {
caps.tags.iter().find_map(|t| match t {
Tag::Reserved { prefix, .. } if prefix == edge.prefix() => Some(t.clone()),
_ => None,
})
})
.flatten();
path.push((parent_node, parent_lookup_tag));
match next_edge {
Some(t) => current_tag = t,
None => break, }
}
path
}
fn nearest<F: Fn(u64) -> Option<Duration>>(
&self,
predicate: &Predicate,
rtt_lookup: F,
n: usize,
) -> Vec<(
u64,
crate::adapter::net::behavior::capability::CapabilitySet,
Option<Distance>,
)> {
if n == 0 {
return Vec::new();
}
let mut survivors = self.filter(predicate);
let mut ranked: Vec<(
u64,
crate::adapter::net::behavior::capability::CapabilitySet,
Option<Distance>,
)> = survivors
.drain(..)
.map(|(id, caps)| {
let dist = rtt_lookup(id).map(Distance);
(id, caps, dist)
})
.collect();
ranked.sort_by(|a, b| match (a.2, b.2) {
(Some(da), Some(db)) => da.cmp(&db).then_with(|| a.0.cmp(&b.0)),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => a.0.cmp(&b.0),
});
ranked.truncate(n);
ranked
}
}
impl CapabilityIndex {
fn find_first_host(&self, tag: &Tag) -> Option<u64> {
for node_id in self.all_nodes() {
let hit = self
.with_caps(node_id, |caps| caps.tags.contains(tag))
.unwrap_or(false);
if hit {
return Some(node_id);
}
}
None
}
}
fn axis_match(
caps: &crate::adapter::net::behavior::capability::CapabilitySet,
axis: TaxonomyAxis,
key: &str,
value: Option<&str>,
) -> bool {
for tag in &caps.tags {
match tag {
Tag::AxisPresent {
axis: tag_axis,
key: tag_key,
} if *tag_axis == axis && tag_key == key && value.is_none() => {
return true;
}
Tag::AxisValue {
axis: tag_axis,
key: tag_key,
value: tag_value,
..
} => {
if *tag_axis != axis || tag_key != key {
continue;
}
match value {
None => return true, Some(target) if tag_value == target => return true,
_ => {}
}
}
_ => {}
}
}
false
}
#[allow(dead_code)]
const _DOC_LINK: BTreeMap<String, String> = BTreeMap::new();
#[allow(dead_code)]
fn _doc_link_tag_key(axis: TaxonomyAxis, k: &str) -> TagKey {
TagKey::new(axis, k.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::behavior::capability::{
CapabilityAnnouncement, CapabilityIndex, CapabilitySet,
};
use crate::adapter::net::identity::EntityId;
use std::sync::Arc;
fn idx() -> Arc<CapabilityIndex> {
let i = Arc::new(CapabilityIndex::new());
let eid = EntityId::from_bytes([0u8; 32]);
let nodes = [
(
0x1111u64,
CapabilitySet::default()
.add_tag("hardware.gpu")
.add_tag("hardware.memory_gb=64")
.with_metadata("region", "us-east")
.with_metadata("intent", "ml-training"),
),
(
0x2222,
CapabilitySet::default()
.add_tag("hardware.cpu_cores=64")
.add_tag("hardware.memory_gb=32")
.with_metadata("region", "us-east"),
),
(
0x3333,
CapabilitySet::default()
.add_tag("hardware.gpu")
.add_tag("hardware.memory_gb=16")
.with_metadata("region", "us-west")
.with_metadata("intent", "ml-training"),
),
(
0x4444,
CapabilitySet::default()
.add_tag("hardware.cpu_cores=16")
.with_metadata("region", "eu-central"),
),
];
for (id, caps) in nodes {
i.index(CapabilityAnnouncement::new(id, eid.clone(), 1, caps));
}
i
}
#[test]
fn match_axis_presence_finds_all_gpu_nodes() {
let i = idx();
let mut got: Vec<u64> = i
.match_axis(TaxonomyAxis::Hardware, "gpu", None)
.into_iter()
.map(|(n, _)| n)
.collect();
got.sort();
assert_eq!(got, vec![0x1111, 0x3333]);
}
#[test]
fn match_axis_with_value_matches_only_exact() {
let i = idx();
let mut got: Vec<u64> = i
.match_axis(TaxonomyAxis::Hardware, "memory_gb", Some("64"))
.into_iter()
.map(|(n, _)| n)
.collect();
got.sort();
assert_eq!(got, vec![0x1111]);
}
#[test]
fn filter_with_composite_predicate() {
let i = idx();
let pred = Predicate::and(vec![
Predicate::exists(TagKey::new(TaxonomyAxis::Hardware, "gpu".to_string())),
Predicate::metadata_equals("intent", "ml-training"),
]);
let mut got: Vec<u64> = i.filter(&pred).into_iter().map(|(n, _)| n).collect();
got.sort();
assert_eq!(got, vec![0x1111, 0x3333]);
}
#[test]
fn aggregate_count_matches_filter_len() {
let i = idx();
let pred = Predicate::exists(TagKey::new(TaxonomyAxis::Hardware, "gpu".to_string()));
let count = i.aggregate(&pred, Count::default());
assert_eq!(count, 2);
}
#[test]
fn aggregate_unique_axis_values_collects_distinct() {
let i = idx();
let pred = Predicate::exists(TagKey::new(TaxonomyAxis::Hardware, "gpu".to_string()));
let agg = UniqueAxisValues::new(TaxonomyAxis::Hardware, "memory_gb");
let mut got = i.aggregate(&pred, agg);
got.sort();
assert_eq!(got, vec!["16".to_string(), "64".to_string()]);
}
#[test]
fn aggregate_max_numeric_metadata() {
let i = Arc::new(CapabilityIndex::new());
let eid = EntityId::from_bytes([0u8; 32]);
for (id, weight) in [(0xa, "0.3"), (0xb, "0.9"), (0xc, "0.5")] {
let caps = CapabilitySet::default()
.add_tag("hardware.gpu")
.with_metadata("priority_weight", weight);
i.index(CapabilityAnnouncement::new(id, eid.clone(), 1, caps));
}
let pred = Predicate::exists(TagKey::new(TaxonomyAxis::Hardware, "gpu".to_string()));
let agg = MaxNumericMetadata::new("priority_weight");
let max = i.aggregate(&pred, agg);
assert_eq!(max, Some(0.9));
}
#[test]
fn empty_match_returns_empty() {
let i = idx();
let pred = Predicate::exists(TagKey::new(
TaxonomyAxis::Hardware,
"nonexistent_key".to_string(),
));
assert!(i.filter(&pred).is_empty());
assert_eq!(i.aggregate(&pred, Count::default()), 0);
}
#[test]
fn composes_match_axis_with_post_filter() {
let i = idx();
let gpu_nodes = i.match_axis(TaxonomyAxis::Hardware, "gpu", None);
let us_east_gpu: Vec<u64> = gpu_nodes
.into_iter()
.filter(|(_, caps)| {
caps.metadata
.get("region")
.map(|r| r == "us-east")
.unwrap_or(false)
})
.map(|(n, _)| n)
.collect();
assert_eq!(us_east_gpu, vec![0x1111]);
}
fn add_reserved_tag(caps: CapabilitySet, raw: &str) -> CapabilitySet {
let mut caps = caps;
let parsed = Tag::parse(raw).unwrap();
caps.tags.insert(parsed);
caps
}
#[test]
fn traverse_fork_of_walks_chain() {
let i = Arc::new(CapabilityIndex::new());
let eid = EntityId::from_bytes([0u8; 32]);
let r_caps = add_reserved_tag(CapabilitySet::default(), "causal:R");
i.index(CapabilityAnnouncement::new(0xAu64, eid.clone(), 1, r_caps));
let f1_caps = add_reserved_tag(CapabilitySet::default(), "causal:F1");
let f1_caps = add_reserved_tag(f1_caps, "fork-of:R");
i.index(CapabilityAnnouncement::new(0xBu64, eid.clone(), 1, f1_caps));
let f2_caps = add_reserved_tag(CapabilitySet::default(), "causal:F2");
let f2_caps = add_reserved_tag(f2_caps, "fork-of:F1");
i.index(CapabilityAnnouncement::new(0xCu64, eid.clone(), 1, f2_caps));
let start_tag = Tag::parse("fork-of:F1").unwrap();
let path = i.traverse(0xCu64, &start_tag, EdgeKind::ForkOfParent, 5);
assert_eq!(path.len(), 3, "path: {path:?}");
assert_eq!(path[0].0, 0xC);
assert_eq!(path[1].0, 0xB);
assert_eq!(path[2].0, 0xA);
}
#[test]
fn traverse_honors_max_depth() {
let i = Arc::new(CapabilityIndex::new());
let eid = EntityId::from_bytes([0u8; 32]);
let r_caps = add_reserved_tag(CapabilitySet::default(), "causal:R");
i.index(CapabilityAnnouncement::new(0xAu64, eid.clone(), 1, r_caps));
let f1_caps = add_reserved_tag(CapabilitySet::default(), "causal:F1");
let f1_caps = add_reserved_tag(f1_caps, "fork-of:R");
i.index(CapabilityAnnouncement::new(0xBu64, eid.clone(), 1, f1_caps));
let start = Tag::parse("fork-of:R").unwrap();
let path = i.traverse(0xBu64, &start, EdgeKind::ForkOfParent, 0);
assert_eq!(path.len(), 1);
assert_eq!(path[0].0, 0xB);
let path = i.traverse(0xBu64, &start, EdgeKind::ForkOfParent, 1);
assert_eq!(path.len(), 2);
assert_eq!(path[1].0, 0xA);
}
#[test]
fn traverse_terminates_at_root() {
let i = Arc::new(CapabilityIndex::new());
let eid = EntityId::from_bytes([0u8; 32]);
let r_caps = add_reserved_tag(CapabilitySet::default(), "causal:R");
i.index(CapabilityAnnouncement::new(0xAu64, eid.clone(), 1, r_caps));
let f1_caps = add_reserved_tag(CapabilitySet::default(), "causal:F1");
let f1_caps = add_reserved_tag(f1_caps, "fork-of:R");
i.index(CapabilityAnnouncement::new(0xBu64, eid.clone(), 1, f1_caps));
let start = Tag::parse("fork-of:R").unwrap();
let path = i.traverse(0xBu64, &start, EdgeKind::ForkOfParent, 100);
assert_eq!(path.len(), 2);
assert_eq!(path[1].0, 0xA);
}
#[test]
fn traverse_halts_on_fork_of_cycle() {
let i = Arc::new(CapabilityIndex::new());
let eid = EntityId::from_bytes([0u8; 32]);
let a_caps = add_reserved_tag(CapabilitySet::default(), "causal:CA");
let a_caps = add_reserved_tag(a_caps, "fork-of:CB");
i.index(CapabilityAnnouncement::new(0xAu64, eid.clone(), 1, a_caps));
let b_caps = add_reserved_tag(CapabilitySet::default(), "causal:CB");
let b_caps = add_reserved_tag(b_caps, "fork-of:CA");
i.index(CapabilityAnnouncement::new(0xBu64, eid.clone(), 1, b_caps));
let start = Tag::parse("fork-of:CB").unwrap();
let path = i.traverse(0xAu64, &start, EdgeKind::ForkOfParent, 64);
assert!(
path.len() <= 3,
"cycle did not terminate; path has {} entries: {:?}",
path.len(),
path
);
let mut seen = std::collections::BTreeSet::new();
for (id, _) in &path {
assert!(
seen.insert(*id),
"node 0x{id:X} visited twice in path: {path:?}"
);
}
}
#[test]
fn traverse_halts_when_parent_chain_unknown() {
let i = Arc::new(CapabilityIndex::new());
let eid = EntityId::from_bytes([0u8; 32]);
let f1_caps = add_reserved_tag(CapabilitySet::default(), "causal:F1");
let f1_caps = add_reserved_tag(f1_caps, "fork-of:UNKNOWN_PARENT");
i.index(CapabilityAnnouncement::new(0xBu64, eid.clone(), 1, f1_caps));
let start = Tag::parse("fork-of:UNKNOWN_PARENT").unwrap();
let path = i.traverse(0xBu64, &start, EdgeKind::ForkOfParent, 5);
assert_eq!(path.len(), 1);
}
#[test]
fn nearest_ranks_by_rtt_ascending() {
let i = idx();
let pred = Predicate::exists(TagKey::new(TaxonomyAxis::Hardware, "memory_gb".to_string()));
let rtts = |id: u64| -> Option<Duration> {
match id {
0x1111 => Some(Duration::from_millis(5)),
0x2222 => Some(Duration::from_millis(50)),
0x3333 => Some(Duration::from_millis(1)),
0x4444 => Some(Duration::from_millis(100)),
_ => None,
}
};
let top = i.nearest(&pred, rtts, 3);
let ids: Vec<u64> = top.iter().map(|(n, _, _)| *n).collect();
assert_eq!(ids, vec![0x3333, 0x1111, 0x2222]);
}
#[test]
fn nearest_truncates_at_n() {
let i = idx();
let pred = Predicate::exists(TagKey::new(TaxonomyAxis::Hardware, "memory_gb".to_string()));
let rtts = |_: u64| Some(Duration::from_millis(1));
assert!(i.nearest(&pred, rtts, 0).is_empty());
let all = i.nearest(&pred, rtts, 100);
assert_eq!(all.len(), 3);
}
#[test]
fn nearest_unmeasured_candidates_sort_last() {
let i = idx();
let pred = Predicate::exists(TagKey::new(TaxonomyAxis::Hardware, "memory_gb".to_string()));
let rtts = |id: u64| -> Option<Duration> {
if id == 0x2222 {
Some(Duration::from_millis(10))
} else {
None
}
};
let ranked = i.nearest(&pred, rtts, 100);
let ids: Vec<u64> = ranked.iter().map(|(n, _, _)| *n).collect();
assert_eq!(ids[0], 0x2222);
assert!(ids[1] <= ids[2]);
}
}