use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
use super::capability::{CapabilityFold, CapabilityMembership, NodeState};
use super::state::NodeId;
use super::Fold;
use crate::adapter::net::behavior::tag::{Tag, TaxonomyAxis};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum TagMatcher {
Exact {
value: String,
},
Prefix {
value: String,
},
Axis {
axis: TaxonomyAxis,
},
AxisKey {
axis: TaxonomyAxis,
key: String,
},
Regex {
pattern: String,
},
VersionRange {
axis_key: String,
min: Option<String>,
max: Option<String>,
},
}
impl TagMatcher {
fn compile(&self) -> CompiledMatcher<'_> {
match self {
Self::Exact { value } => CompiledMatcher::Exact { value },
Self::Prefix { value } => CompiledMatcher::Prefix { value },
Self::Axis { axis } => CompiledMatcher::Axis { axis: *axis },
Self::AxisKey { axis, key } => CompiledMatcher::AxisKey { axis: *axis, key },
Self::Regex { pattern } => CompiledMatcher::Regex {
re: regex::Regex::new(pattern).ok(),
},
Self::VersionRange { axis_key, min, max } => match split_axis_key(axis_key) {
Some((axis, key)) => CompiledMatcher::VersionRange {
axis,
key,
min: min.as_deref().and_then(|s| semver::Version::parse(s).ok()),
max: max.as_deref().and_then(|s| semver::Version::parse(s).ok()),
},
None => CompiledMatcher::MatchesNothing,
},
}
}
}
enum CompiledMatcher<'a> {
Exact {
value: &'a str,
},
Prefix {
value: &'a str,
},
Axis {
axis: TaxonomyAxis,
},
AxisKey {
axis: TaxonomyAxis,
key: &'a str,
},
Regex {
re: Option<regex::Regex>,
},
VersionRange {
axis: TaxonomyAxis,
key: &'a str,
min: Option<semver::Version>,
max: Option<semver::Version>,
},
MatchesNothing,
}
impl CompiledMatcher<'_> {
fn matches_any(&self, tags: &[String]) -> bool {
tags.iter().any(|t| self.matches_one(t))
}
fn matches_one(&self, raw: &str) -> bool {
match self {
Self::Exact { value } => raw == *value,
Self::Prefix { value } => raw.starts_with(value),
Self::Axis { axis } => {
Tag::parse(raw)
.ok()
.and_then(|t| t.axis_key().map(|k| k.axis))
== Some(*axis)
}
Self::AxisKey { axis, key } => Tag::parse(raw)
.ok()
.and_then(|t| t.axis_key())
.is_some_and(|k| k.axis == *axis && k.key == *key),
Self::Regex { re } => re.as_ref().is_some_and(|r| r.is_match(raw)),
Self::VersionRange {
axis,
key,
min,
max,
} => {
let Some(value) = axis_value_for(raw, *axis, key) else {
return false;
};
let Ok(parsed) = semver::Version::parse(&value) else {
return false;
};
if let Some(lo) = min.as_ref() {
if parsed < *lo {
return false;
}
}
if let Some(hi) = max.as_ref() {
if parsed > *hi {
return false;
}
}
true
}
Self::MatchesNothing => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum GroupBy {
#[default]
Class,
State,
Region,
Publisher,
TagStem {
prefix: String,
},
TagValue {
axis: TaxonomyAxis,
key: String,
},
}
impl GroupBy {
fn bucket_keys(&self, membership: &CapabilityMembership, publisher: NodeId) -> Vec<String> {
match self {
Self::Class => vec![format!("0x{:x}", membership.class_hash)],
Self::State => vec![state_label(membership.state).to_string()],
Self::Region => vec![membership
.region
.clone()
.unwrap_or_else(|| "(none)".to_string())],
Self::Publisher => vec![format!("0x{:x}", publisher)],
Self::TagStem { prefix } => {
let mut buckets: Vec<String> = membership
.tags
.iter()
.filter_map(|t| tag_stem_after(t, prefix))
.collect();
buckets.sort();
buckets.dedup();
buckets
}
Self::TagValue { axis, key } => {
let mut values: Vec<String> = membership
.tags
.iter()
.filter_map(|raw| axis_value_for(raw, *axis, key))
.collect();
values.sort();
values.dedup();
values
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Aggregation {
Count,
DistinctPublishers,
DistinctValues {
axis: TaxonomyAxis,
key: String,
},
SumNumericTag {
axis_key: String,
},
MinNumericTag {
axis_key: String,
},
MaxNumericTag {
axis_key: String,
},
}
#[derive(Clone, Copy)]
enum CompiledAgg<'a> {
Count,
DistinctPublishers,
DistinctValues {
axis: TaxonomyAxis,
key: &'a str,
},
Numeric {
axis: TaxonomyAxis,
key: &'a str,
},
Inert,
}
impl<'a> CompiledAgg<'a> {
fn compile(agg: &'a Aggregation) -> CompiledAgg<'a> {
match agg {
Aggregation::Count => CompiledAgg::Count,
Aggregation::DistinctPublishers => CompiledAgg::DistinctPublishers,
Aggregation::DistinctValues { axis, key } => {
CompiledAgg::DistinctValues { axis: *axis, key }
}
Aggregation::SumNumericTag { axis_key }
| Aggregation::MinNumericTag { axis_key }
| Aggregation::MaxNumericTag { axis_key } => match split_axis_key(axis_key) {
Some((axis, key)) => CompiledAgg::Numeric { axis, key },
None => CompiledAgg::Inert,
},
}
}
}
impl Fold<CapabilityFold> {
pub fn aggregate(
&self,
matcher: Option<TagMatcher>,
group_by: GroupBy,
agg: Aggregation,
) -> Vec<(String, u64)> {
let mut buckets: HashMap<String, BucketAccum> = HashMap::new();
let compiled = matcher.as_ref().map(TagMatcher::compile);
let compiled_agg = CompiledAgg::compile(&agg);
self.with_state(|state| {
for ((_class, publisher), entry) in state.entries.iter() {
let membership = &entry.payload;
if let Some(m) = &compiled {
if !m.matches_any(&membership.tags) {
continue;
}
}
let keys = group_by.bucket_keys(membership, *publisher);
if keys.is_empty() {
continue;
}
for key in keys {
let slot = buckets.entry(key).or_default();
slot.count = slot.count.saturating_add(1);
slot.publishers.insert(*publisher);
match compiled_agg {
CompiledAgg::DistinctValues { axis, key: k } => {
for raw in &membership.tags {
if let Some(v) = axis_value_for(raw, axis, k) {
slot.distinct_values.insert(v);
}
}
}
CompiledAgg::Numeric { axis, key: k } => {
for raw in &membership.tags {
if let Some(n) = numeric_value_for_split(raw, axis, k) {
slot.numeric_sum = slot.numeric_sum.saturating_add(n);
slot.numeric_min =
Some(slot.numeric_min.map_or(n, |cur| cur.min(n)));
slot.numeric_max =
Some(slot.numeric_max.map_or(n, |cur| cur.max(n)));
}
}
}
CompiledAgg::Count
| CompiledAgg::DistinctPublishers
| CompiledAgg::Inert => {}
}
}
}
});
let mut rows: Vec<(String, u64)> = buckets
.into_iter()
.map(|(bucket, slot)| {
let v: u64 = match &agg {
Aggregation::Count => slot.count,
Aggregation::DistinctPublishers => slot.publishers.len() as u64,
Aggregation::DistinctValues { .. } => slot.distinct_values.len() as u64,
Aggregation::SumNumericTag { .. } => slot.numeric_sum,
Aggregation::MinNumericTag { .. } => slot.numeric_min.unwrap_or(0),
Aggregation::MaxNumericTag { .. } => slot.numeric_max.unwrap_or(0),
};
(bucket, v)
})
.collect();
rows.sort_by(|a, b| a.0.cmp(&b.0));
rows
}
pub fn capacity_ranking<R>(&self, query: CapacityQuery, rtt_lookup: R) -> Vec<CapacityRow>
where
R: Fn(NodeId) -> Option<u32>,
{
let mut buckets: HashMap<String, CapacityAccum> = HashMap::new();
let compiled_matcher = query.matcher.as_ref().map(TagMatcher::compile);
let sum_axis_split: Option<(TaxonomyAxis, &str)> =
query.sum_axis_key.as_deref().and_then(split_axis_key);
self.with_state(|state| {
for ((_class, publisher), entry) in state.entries.iter() {
let membership = &entry.payload;
if membership.state == NodeState::Faulty {
continue;
}
if let Some(m) = &compiled_matcher {
if !m.matches_any(&membership.tags) {
continue;
}
}
if let Some(max) = query.max_rtt_ms {
let Some(rtt) = rtt_lookup(*publisher) else {
continue;
};
if rtt > max {
continue;
}
}
let keys = query.group_by.bucket_keys(membership, *publisher);
if keys.is_empty() {
continue;
}
let entry_capacity: Option<u64> = sum_axis_split.map(|(axis, key)| {
membership
.tags
.iter()
.filter_map(|t| numeric_value_for_split(t, axis, key))
.fold(0u64, |acc, n| acc.saturating_add(n))
});
for key in keys {
let slot = buckets.entry(key).or_default();
match membership.state {
NodeState::Idle => slot.idle = slot.idle.saturating_add(1),
NodeState::Busy => slot.busy = slot.busy.saturating_add(1),
NodeState::Reserved => slot.reserved = slot.reserved.saturating_add(1),
NodeState::Faulty => unreachable!("filtered above"),
}
if let Some(c) = entry_capacity {
slot.summed_capacity =
Some(slot.summed_capacity.unwrap_or(0).saturating_add(c));
}
}
}
});
let mut rows: Vec<CapacityRow> = buckets
.into_iter()
.map(|(bucket, slot)| {
let available = slot
.idle
.saturating_add(slot.busy)
.saturating_add(slot.reserved);
CapacityRow {
bucket,
idle: slot.idle,
busy: slot.busy,
reserved: slot.reserved,
available,
summed_capacity: slot.summed_capacity,
}
})
.collect();
rows.sort_by(|a, b| b.available.cmp(&a.available).then(a.bucket.cmp(&b.bucket)));
if query.limit > 0 && rows.len() > query.limit {
rows.truncate(query.limit);
}
rows
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct CapacityQuery {
pub matcher: Option<TagMatcher>,
pub group_by: GroupBy,
pub max_rtt_ms: Option<u32>,
pub sum_axis_key: Option<String>,
pub limit: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CapacityRow {
pub bucket: String,
pub idle: u64,
pub busy: u64,
pub reserved: u64,
pub available: u64,
pub summed_capacity: Option<u64>,
}
#[derive(Default)]
struct BucketAccum {
count: u64,
publishers: HashSet<NodeId>,
distinct_values: HashSet<String>,
numeric_sum: u64,
numeric_min: Option<u64>,
numeric_max: Option<u64>,
}
#[derive(Default)]
struct CapacityAccum {
idle: u64,
busy: u64,
reserved: u64,
summed_capacity: Option<u64>,
}
fn state_label(state: NodeState) -> &'static str {
match state {
NodeState::Idle => "idle",
NodeState::Busy => "busy",
NodeState::Reserved => "reserved",
NodeState::Faulty => "faulty",
}
}
fn tag_stem_after(tag: &str, prefix: &str) -> Option<String> {
let rest = tag.strip_prefix(prefix)?;
if rest.is_empty() {
return Some("(present)".to_string());
}
let rest = rest.strip_prefix('.')?;
let stem_end = rest.find(['.', '=', ':']).unwrap_or(rest.len());
if stem_end == 0 {
None
} else {
Some(rest[..stem_end].to_string())
}
}
fn axis_value_for(raw: &str, want_axis: TaxonomyAxis, want_key: &str) -> Option<String> {
let tag = Tag::parse(raw).ok()?;
match tag {
Tag::AxisValue {
axis, key, value, ..
} if axis == want_axis && key == want_key => Some(value),
_ => None,
}
}
fn numeric_value_for_split(raw: &str, axis: TaxonomyAxis, key: &str) -> Option<u64> {
axis_value_for(raw, axis, key)?.parse::<u64>().ok()
}
fn split_axis_key(want_axis_key: &str) -> Option<(TaxonomyAxis, &str)> {
let (want_axis_str, want_key) = want_axis_key.split_once('.')?;
let want_axis = TaxonomyAxis::from_prefix(want_axis_str)?;
Some((want_axis, want_key))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::behavior::fold::wire::SignedAnnouncement;
use crate::adapter::net::behavior::fold::EnvelopeMeta;
use crate::adapter::net::behavior::fold::FoldKind;
use crate::adapter::net::identity::EntityKeypair;
use std::collections::BTreeMap;
use std::time::Duration;
fn new_fold() -> Fold<CapabilityFold> {
Fold::<CapabilityFold>::with_sweep_interval(Duration::ZERO)
}
fn sign(
kp: &EntityKeypair,
publisher: NodeId,
class: u64,
tags: &[&str],
state: NodeState,
region: Option<&str>,
) -> SignedAnnouncement<CapabilityMembership> {
SignedAnnouncement::sign(
kp,
CapabilityFold::KIND_ID,
class,
publisher,
1,
EnvelopeMeta::default(),
CapabilityMembership {
class_hash: class,
tags: tags.iter().map(|s| (*s).to_string()).collect(),
hardware: None,
state,
region: region.map(|s| s.to_string()),
price_quote: None,
reflex_addr: None,
allowed_nodes: Vec::new(),
allowed_subnets: Vec::new(),
allowed_groups: Vec::new(),
metadata: BTreeMap::new(),
},
)
.expect("sign")
}
fn populated_fold() -> Fold<CapabilityFold> {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign(
&kp,
0xA,
0x100,
&[
"hardware.gpu",
"hardware.gpu.h100",
"hardware.gpu.count=8",
"software.python=3.11",
],
NodeState::Idle,
Some("us-east"),
))
.unwrap();
fold.apply(sign(
&kp,
0xB,
0x100,
&[
"hardware.gpu",
"hardware.gpu.h100",
"hardware.gpu.count=4",
"software.python=3.12",
],
NodeState::Busy,
Some("us-east"),
))
.unwrap();
fold.apply(sign(
&kp,
0xC,
0x200,
&[
"hardware.gpu",
"hardware.gpu.a100",
"hardware.gpu.count=2",
"software.python=3.11",
],
NodeState::Idle,
Some("us-west"),
))
.unwrap();
fold
}
#[test]
fn matcher_exact_picks_only_exact_tag() {
let fold = populated_fold();
let rows = fold.aggregate(
Some(TagMatcher::Exact {
value: "software.python=3.11".into(),
}),
GroupBy::Publisher,
Aggregation::Count,
);
let publishers: Vec<&str> = rows.iter().map(|(b, _)| b.as_str()).collect();
assert_eq!(publishers, vec!["0xa", "0xc"]);
}
#[test]
fn matcher_prefix_picks_everything_under_the_prefix() {
let fold = populated_fold();
let rows = fold.aggregate(
Some(TagMatcher::Prefix {
value: "hardware.gpu".into(),
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert_eq!(rows.len(), 3);
}
#[test]
fn matcher_axis_picks_every_entry_in_that_axis() {
let fold = populated_fold();
let rows = fold.aggregate(
Some(TagMatcher::Axis {
axis: TaxonomyAxis::Hardware,
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert_eq!(rows.len(), 3, "every entry has a hardware.* tag");
}
#[test]
fn matcher_axis_key_picks_only_entries_with_that_key() {
let fold = populated_fold();
let rows = fold.aggregate(
Some(TagMatcher::AxisKey {
axis: TaxonomyAxis::Hardware,
key: "gpu.count".into(),
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert_eq!(rows.len(), 3);
let rows = fold.aggregate(
Some(TagMatcher::AxisKey {
axis: TaxonomyAxis::Software,
key: "python".into(),
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert_eq!(rows.len(), 3);
let rows = fold.aggregate(
Some(TagMatcher::AxisKey {
axis: TaxonomyAxis::Hardware,
key: "nonexistent".into(),
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert!(rows.is_empty());
}
#[test]
fn no_matcher_includes_every_entry() {
let fold = populated_fold();
let rows = fold.aggregate(None, GroupBy::Publisher, Aggregation::Count);
assert_eq!(rows.len(), 3);
}
#[test]
fn group_by_class_buckets_by_class_hash() {
let fold = populated_fold();
let rows = fold.aggregate(None, GroupBy::Class, Aggregation::Count);
assert_eq!(
rows,
vec![("0x100".to_string(), 2), ("0x200".to_string(), 1)]
);
}
#[test]
fn group_by_state_buckets_idle_busy_reserved_faulty() {
let fold = populated_fold();
let rows = fold.aggregate(None, GroupBy::State, Aggregation::Count);
assert_eq!(rows, vec![("busy".to_string(), 1), ("idle".to_string(), 2)]);
}
#[test]
fn group_by_region_renders_none_as_explicit_string() {
let fold = populated_fold();
let rows = fold.aggregate(None, GroupBy::Region, Aggregation::Count);
assert_eq!(
rows,
vec![("us-east".to_string(), 2), ("us-west".to_string(), 1)]
);
let kp = EntityKeypair::generate();
fold.apply(sign(&kp, 0xD, 0x300, &[], NodeState::Idle, None))
.unwrap();
let rows = fold.aggregate(None, GroupBy::Region, Aggregation::Count);
assert_eq!(
rows,
vec![
("(none)".to_string(), 1),
("us-east".to_string(), 2),
("us-west".to_string(), 1),
]
);
}
#[test]
fn group_by_publisher_buckets_by_node_id_hex() {
let fold = populated_fold();
let rows = fold.aggregate(None, GroupBy::Publisher, Aggregation::Count);
assert_eq!(
rows,
vec![
("0xa".to_string(), 1),
("0xb".to_string(), 1),
("0xc".to_string(), 1),
]
);
}
#[test]
fn group_by_tag_stem_buckets_per_dotted_stem_after_prefix() {
let fold = populated_fold();
let rows = fold.aggregate(
None,
GroupBy::TagStem {
prefix: "hardware.gpu".into(),
},
Aggregation::Count,
);
let map: HashMap<String, u64> = rows.into_iter().collect();
assert_eq!(map.get("h100").copied(), Some(2));
assert_eq!(map.get("a100").copied(), Some(1));
assert_eq!(map.get("count").copied(), Some(3));
assert_eq!(map.get("(present)").copied(), Some(3));
}
#[test]
fn group_by_tag_value_extracts_value_after_separator() {
let fold = populated_fold();
let rows = fold.aggregate(
None,
GroupBy::TagValue {
axis: TaxonomyAxis::Software,
key: "python".into(),
},
Aggregation::Count,
);
assert_eq!(rows, vec![("3.11".to_string(), 2), ("3.12".to_string(), 1)]);
}
#[test]
fn aggregation_count_returns_entry_count_per_bucket() {
let fold = populated_fold();
let rows = fold.aggregate(None, GroupBy::Region, Aggregation::Count);
assert_eq!(
rows,
vec![("us-east".to_string(), 2), ("us-west".to_string(), 1)]
);
}
#[test]
fn aggregation_distinct_publishers_dedupes_per_bucket() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign(&kp, 0xA, 0x100, &[], NodeState::Idle, Some("us-east")))
.unwrap();
fold.apply(sign(&kp, 0xA, 0x200, &[], NodeState::Idle, Some("us-east")))
.unwrap();
fold.apply(sign(&kp, 0xB, 0x100, &[], NodeState::Idle, Some("us-east")))
.unwrap();
let by_count = fold.aggregate(None, GroupBy::Region, Aggregation::Count);
assert_eq!(by_count, vec![("us-east".to_string(), 3)]);
let by_publishers = fold.aggregate(None, GroupBy::Region, Aggregation::DistinctPublishers);
assert_eq!(by_publishers, vec![("us-east".to_string(), 2)]);
}
#[test]
fn aggregation_distinct_values_counts_unique_values_per_bucket() {
let fold = populated_fold();
let rows = fold.aggregate(
None,
GroupBy::Region,
Aggregation::DistinctValues {
axis: TaxonomyAxis::Software,
key: "python".into(),
},
);
assert_eq!(
rows,
vec![("us-east".to_string(), 2), ("us-west".to_string(), 1)]
);
}
#[test]
fn matcher_narrows_before_grouping() {
let fold = populated_fold();
let rows = fold.aggregate(
Some(TagMatcher::Exact {
value: "hardware.gpu.h100".into(),
}),
GroupBy::Region,
Aggregation::Count,
);
assert_eq!(rows, vec![("us-east".to_string(), 2)]);
}
#[test]
fn empty_fold_aggregates_to_empty_vec() {
let fold = new_fold();
let rows = fold.aggregate(None, GroupBy::Region, Aggregation::Count);
assert!(rows.is_empty());
}
#[test]
fn matcher_that_excludes_everything_returns_empty() {
let fold = populated_fold();
let rows = fold.aggregate(
Some(TagMatcher::Exact {
value: "nope".into(),
}),
GroupBy::Region,
Aggregation::Count,
);
assert!(rows.is_empty());
}
#[test]
fn tag_stem_after_handles_bare_presence_form() {
assert_eq!(
tag_stem_after("hardware.gpu", "hardware.gpu"),
Some("(present)".to_string())
);
}
#[test]
fn tag_stem_after_extracts_segment_up_to_next_separator() {
assert_eq!(
tag_stem_after("hardware.gpu.h100", "hardware.gpu"),
Some("h100".to_string())
);
assert_eq!(
tag_stem_after("hardware.gpu.vram_gb=80", "hardware.gpu"),
Some("vram_gb".to_string())
);
assert_eq!(
tag_stem_after("hardware.gpu.count:8", "hardware.gpu"),
Some("count".to_string())
);
}
#[test]
fn tag_stem_after_returns_none_for_non_matching_tag() {
assert_eq!(tag_stem_after("software.python=3.11", "hardware.gpu"), None);
}
#[test]
fn aggregation_sum_numeric_tag_sums_parseable_values() {
let fold = populated_fold();
let rows = fold.aggregate(
None,
GroupBy::Region,
Aggregation::SumNumericTag {
axis_key: "hardware.gpu.count".into(),
},
);
assert_eq!(
rows,
vec![("us-east".to_string(), 12), ("us-west".to_string(), 2)]
);
}
#[test]
fn aggregation_sum_numeric_tag_skips_unparseable_and_missing() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign(
&kp,
0xA,
0x100,
&["hardware.gpu.count=8"],
NodeState::Idle,
Some("r1"),
))
.unwrap();
fold.apply(sign(
&kp,
0xB,
0x100,
&["hardware.gpu.count=not-a-number"],
NodeState::Idle,
Some("r1"),
))
.unwrap();
fold.apply(sign(
&kp,
0xC,
0x100,
&["hardware.gpu"],
NodeState::Idle,
Some("r1"),
))
.unwrap();
let rows = fold.aggregate(
None,
GroupBy::Region,
Aggregation::SumNumericTag {
axis_key: "hardware.gpu.count".into(),
},
);
assert_eq!(rows, vec![("r1".to_string(), 8)]);
}
fn rtt_map(entries: &[(NodeId, u32)]) -> impl Fn(NodeId) -> Option<u32> + '_ {
move |id| entries.iter().find(|(n, _)| *n == id).map(|(_, r)| *r)
}
#[test]
fn capacity_ranking_breaks_down_state_per_bucket() {
let fold = populated_fold();
let rows = fold.capacity_ranking(
CapacityQuery {
group_by: GroupBy::Region,
..CapacityQuery::default()
},
|_| None,
);
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].bucket, "us-east");
assert_eq!(rows[0].idle, 1);
assert_eq!(rows[0].busy, 1);
assert_eq!(rows[0].reserved, 0);
assert_eq!(rows[0].available, 2);
assert_eq!(rows[0].summed_capacity, None);
assert_eq!(rows[1].bucket, "us-west");
assert_eq!(rows[1].idle, 1);
assert_eq!(rows[1].available, 1);
}
#[test]
fn capacity_ranking_excludes_faulty_entries() {
let fold = populated_fold();
let kp = EntityKeypair::generate();
fold.apply(sign(
&kp,
0xD,
0x100,
&["hardware.gpu"],
NodeState::Faulty,
Some("us-east"),
))
.unwrap();
let rows = fold.capacity_ranking(
CapacityQuery {
group_by: GroupBy::Region,
..CapacityQuery::default()
},
|_| None,
);
let east = rows.iter().find(|r| r.bucket == "us-east").unwrap();
assert_eq!(east.available, 2);
}
#[test]
fn capacity_ranking_honors_max_rtt_ms() {
let fold = populated_fold();
let lookup = rtt_map(&[(0xA, 10), (0xB, 50), (0xC, 200)]);
let rows = fold.capacity_ranking(
CapacityQuery {
group_by: GroupBy::Region,
max_rtt_ms: Some(100),
..CapacityQuery::default()
},
&lookup,
);
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].bucket, "us-east");
assert_eq!(rows[0].available, 2);
}
#[test]
fn capacity_ranking_drops_publishers_with_unknown_rtt_when_filter_set() {
let fold = populated_fold();
let lookup = rtt_map(&[(0xA, 10)]);
let rows = fold.capacity_ranking(
CapacityQuery {
group_by: GroupBy::Region,
max_rtt_ms: Some(100),
..CapacityQuery::default()
},
&lookup,
);
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].bucket, "us-east");
assert_eq!(rows[0].available, 1, "only 0xA survived; 0xB unknown");
}
#[test]
fn capacity_ranking_no_rtt_filter_skips_lookup() {
let fold = populated_fold();
let calls = std::cell::Cell::new(0u32);
let rows = fold.capacity_ranking(
CapacityQuery {
group_by: GroupBy::Region,
..CapacityQuery::default()
},
|_| {
calls.set(calls.get() + 1);
Some(0)
},
);
assert_eq!(calls.get(), 0);
assert_eq!(rows.len(), 2);
}
#[test]
fn capacity_ranking_sum_axis_key_aggregates_per_bucket() {
let fold = populated_fold();
let rows = fold.capacity_ranking(
CapacityQuery {
group_by: GroupBy::Region,
sum_axis_key: Some("hardware.gpu.count".into()),
..CapacityQuery::default()
},
|_| None,
);
let east = rows.iter().find(|r| r.bucket == "us-east").unwrap();
let west = rows.iter().find(|r| r.bucket == "us-west").unwrap();
assert_eq!(east.summed_capacity, Some(12), "0xA=8 + 0xB=4");
assert_eq!(west.summed_capacity, Some(2), "0xC=2");
}
#[test]
fn capacity_ranking_sum_axis_key_unset_keeps_field_none() {
let fold = populated_fold();
let rows = fold.capacity_ranking(
CapacityQuery {
group_by: GroupBy::Region,
..CapacityQuery::default()
},
|_| None,
);
for row in &rows {
assert_eq!(row.summed_capacity, None);
}
}
#[test]
fn capacity_ranking_sorts_by_available_descending_then_bucket_ascending() {
let fold = new_fold();
let kp = EntityKeypair::generate();
for nid in [1u64, 2, 3] {
fold.apply(sign(&kp, nid, 0x100, &[], NodeState::Idle, Some("us-east")))
.unwrap();
}
fold.apply(sign(&kp, 10, 0x100, &[], NodeState::Idle, Some("us-west")))
.unwrap();
for nid in [100u64, 101, 102] {
fold.apply(sign(&kp, nid, 0x100, &[], NodeState::Idle, Some("eu-west")))
.unwrap();
}
let rows = fold.capacity_ranking(
CapacityQuery {
group_by: GroupBy::Region,
..CapacityQuery::default()
},
|_| None,
);
let buckets: Vec<&str> = rows.iter().map(|r| r.bucket.as_str()).collect();
assert_eq!(buckets, vec!["eu-west", "us-east", "us-west"]);
}
#[test]
fn capacity_ranking_truncates_to_limit() {
let fold = new_fold();
let kp = EntityKeypair::generate();
for nid in 1u64..=10 {
fold.apply(sign(
&kp,
nid,
0x100,
&[],
NodeState::Idle,
Some(&format!("region-{}", nid % 5)),
))
.unwrap();
}
let rows = fold.capacity_ranking(
CapacityQuery {
group_by: GroupBy::Region,
limit: 3,
..CapacityQuery::default()
},
|_| None,
);
assert_eq!(rows.len(), 3);
}
#[test]
fn capacity_ranking_matcher_narrows_before_state_breakdown() {
let fold = populated_fold();
let rows = fold.capacity_ranking(
CapacityQuery {
matcher: Some(TagMatcher::Exact {
value: "hardware.gpu.h100".into(),
}),
group_by: GroupBy::Region,
..CapacityQuery::default()
},
|_| None,
);
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].bucket, "us-east");
assert_eq!(rows[0].idle, 1);
assert_eq!(rows[0].busy, 1);
assert_eq!(rows[0].available, 2);
}
fn numeric_value_for(raw: &str, want_axis_key: &str) -> Option<u64> {
let (axis, key) = split_axis_key(want_axis_key)?;
numeric_value_for_split(raw, axis, key)
}
#[test]
fn numeric_value_for_parses_axis_value_tag() {
assert_eq!(
numeric_value_for("hardware.gpu.count=8", "hardware.gpu.count"),
Some(8)
);
assert_eq!(
numeric_value_for("hardware.gpu.count=garbage", "hardware.gpu.count"),
None
);
assert_eq!(
numeric_value_for("hardware.gpu", "hardware.gpu.count"),
None
);
assert_eq!(
numeric_value_for("software.python=3.11", "hardware.gpu.count"),
None
);
}
#[test]
fn numeric_value_for_rejects_malformed_axis_key() {
assert_eq!(numeric_value_for("hardware.gpu.count=8", "no-dot"), None);
assert_eq!(
numeric_value_for("hardware.gpu.count=8", "unknown.count"),
None
);
}
#[test]
fn matcher_regex_matches_pattern_against_canonical_form() {
let fold = populated_fold();
let rows = fold.aggregate(
Some(TagMatcher::Regex {
pattern: r"^hardware\.gpu\.(h100|a100)$".into(),
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert_eq!(rows.len(), 3);
}
#[test]
fn matcher_regex_with_invalid_pattern_matches_nothing() {
let fold = populated_fold();
let rows = fold.aggregate(
Some(TagMatcher::Regex {
pattern: r"[unclosed".into(),
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert!(rows.is_empty(), "invalid regex must reject everything");
}
#[test]
fn matcher_version_range_picks_entries_within_inclusive_bounds() {
let fold = new_fold();
let kp = EntityKeypair::generate();
for (node_id, value) in [(0xA, "3.11.0"), (0xB, "3.12.0"), (0xC, "3.11.0")] {
fold.apply(sign(
&kp,
node_id,
0x100,
&[&format!("software.python={value}")],
NodeState::Idle,
None,
))
.unwrap();
}
let rows = fold.aggregate(
Some(TagMatcher::VersionRange {
axis_key: "software.python".into(),
min: Some("3.11.0".into()),
max: Some("3.11.0".into()),
}),
GroupBy::Publisher,
Aggregation::Count,
);
let mut publishers: Vec<&str> = rows.iter().map(|(b, _)| b.as_str()).collect();
publishers.sort_unstable();
assert_eq!(publishers, vec!["0xa", "0xc"]);
}
#[test]
fn matcher_version_range_handles_unbounded_min_or_max() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign(
&kp,
0xA,
0x100,
&["software.runtime=1.0.0"],
NodeState::Idle,
None,
))
.unwrap();
fold.apply(sign(
&kp,
0xB,
0x100,
&["software.runtime=2.5.0"],
NodeState::Idle,
None,
))
.unwrap();
fold.apply(sign(
&kp,
0xC,
0x100,
&["software.runtime=3.10.0"],
NodeState::Idle,
None,
))
.unwrap();
let rows = fold.aggregate(
Some(TagMatcher::VersionRange {
axis_key: "software.runtime".into(),
min: None,
max: Some("2.5.0".into()),
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert_eq!(rows.len(), 2);
let rows = fold.aggregate(
Some(TagMatcher::VersionRange {
axis_key: "software.runtime".into(),
min: Some("2.5.0".into()),
max: None,
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert_eq!(rows.len(), 2);
let rows = fold.aggregate(
Some(TagMatcher::VersionRange {
axis_key: "software.runtime".into(),
min: None,
max: None,
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert_eq!(rows.len(), 3);
}
#[test]
fn matcher_version_range_skips_unparseable_values() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign(
&kp,
0xA,
0x100,
&["software.runtime=not-a-version"],
NodeState::Idle,
None,
))
.unwrap();
let rows = fold.aggregate(
Some(TagMatcher::VersionRange {
axis_key: "software.runtime".into(),
min: None,
max: None,
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert!(rows.is_empty(), "unparseable values must be skipped");
}
#[test]
fn matcher_version_range_with_unknown_axis_prefix_matches_nothing() {
let fold = populated_fold();
let rows = fold.aggregate(
Some(TagMatcher::VersionRange {
axis_key: "garbage.runtime".into(),
min: None,
max: None,
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert!(rows.is_empty());
let rows = fold.aggregate(
Some(TagMatcher::VersionRange {
axis_key: "no-dot-anywhere".into(),
min: None,
max: None,
}),
GroupBy::Publisher,
Aggregation::Count,
);
assert!(rows.is_empty());
}
#[test]
fn aggregation_min_max_numeric_tag_per_bucket() {
let fold = populated_fold();
let mins = fold.aggregate(
None,
GroupBy::Region,
Aggregation::MinNumericTag {
axis_key: "hardware.gpu.count".into(),
},
);
assert_eq!(
mins,
vec![("us-east".to_string(), 4), ("us-west".to_string(), 2)]
);
let maxes = fold.aggregate(
None,
GroupBy::Region,
Aggregation::MaxNumericTag {
axis_key: "hardware.gpu.count".into(),
},
);
assert_eq!(
maxes,
vec![("us-east".to_string(), 8), ("us-west".to_string(), 2)]
);
}
#[test]
fn serde_shapes_match_cross_binding_wire_format() {
assert_eq!(
serde_json::to_string(&TagMatcher::Exact {
value: "software.python=3.11".into()
})
.unwrap(),
r#"{"kind":"exact","value":"software.python=3.11"}"#,
);
assert_eq!(
serde_json::to_string(&TagMatcher::Prefix {
value: "hardware.gpu".into()
})
.unwrap(),
r#"{"kind":"prefix","value":"hardware.gpu"}"#,
);
assert_eq!(
serde_json::to_string(&TagMatcher::Axis {
axis: TaxonomyAxis::Hardware
})
.unwrap(),
r#"{"kind":"axis","axis":"hardware"}"#,
);
assert_eq!(
serde_json::to_string(&TagMatcher::AxisKey {
axis: TaxonomyAxis::Hardware,
key: "gpu.count".into()
})
.unwrap(),
r#"{"kind":"axis_key","axis":"hardware","key":"gpu.count"}"#,
);
assert_eq!(
serde_json::to_string(&TagMatcher::Regex {
pattern: "^a$".into()
})
.unwrap(),
r#"{"kind":"regex","pattern":"^a$"}"#,
);
assert_eq!(
serde_json::to_string(&TagMatcher::VersionRange {
axis_key: "software.python".into(),
min: Some("3.10.0".into()),
max: None
})
.unwrap(),
r#"{"kind":"version_range","axis_key":"software.python","min":"3.10.0","max":null}"#,
);
assert_eq!(
serde_json::to_string(&GroupBy::Class).unwrap(),
r#"{"kind":"class"}"#,
);
assert_eq!(
serde_json::to_string(&GroupBy::TagStem {
prefix: "hardware.gpu".into()
})
.unwrap(),
r#"{"kind":"tag_stem","prefix":"hardware.gpu"}"#,
);
assert_eq!(
serde_json::to_string(&GroupBy::TagValue {
axis: TaxonomyAxis::Software,
key: "python".into()
})
.unwrap(),
r#"{"kind":"tag_value","axis":"software","key":"python"}"#,
);
assert_eq!(
serde_json::to_string(&Aggregation::Count).unwrap(),
r#"{"kind":"count"}"#,
);
assert_eq!(
serde_json::to_string(&Aggregation::SumNumericTag {
axis_key: "hardware.gpu.count".into()
})
.unwrap(),
r#"{"kind":"sum_numeric_tag","axis_key":"hardware.gpu.count"}"#,
);
let q = CapacityQuery {
matcher: Some(TagMatcher::Prefix {
value: "hardware.gpu".into(),
}),
group_by: GroupBy::TagStem {
prefix: "hardware.gpu".into(),
},
max_rtt_ms: Some(50),
sum_axis_key: Some("hardware.gpu.count".into()),
limit: 5,
};
let s = serde_json::to_string(&q).unwrap();
let back: CapacityQuery = serde_json::from_str(&s).unwrap();
assert_eq!(q, back);
}
#[test]
fn aggregation_min_max_numeric_tag_returns_zero_for_buckets_with_no_values() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign(
&kp,
0xA,
0x100,
&["hardware.gpu"],
NodeState::Idle,
Some("r1"),
))
.unwrap();
let rows = fold.aggregate(
None,
GroupBy::Region,
Aggregation::MinNumericTag {
axis_key: "hardware.gpu.count".into(),
},
);
assert_eq!(
rows,
vec![("r1".to_string(), 0)],
"no parseable values in bucket → 0 (per Min/MaxNumericTag doc)",
);
}
}