use std::collections::HashSet;
use super::super::capability::{
matches_scope, CapabilityAnnouncement, CapabilityFilter as LegacyFilter, CapabilityScope,
GpuVendor, ScopeFilter,
};
use super::capability::{
CapabilityFilter, CapabilityFold, CapabilityMembership, CapabilityQuery, HardwareSummary,
};
use super::state::FoldError;
use super::{ApplyOutcome, EnvelopeMeta, Fold, FoldKind, NodeId, NodeState, SignedAnnouncement};
pub fn translate_filter(legacy: &LegacyFilter) -> CapabilityFilter {
CapabilityFilter {
class: None,
tags_all: legacy.require_tags.clone(),
tags_any: Vec::new(),
state: None,
region: None,
limit: 0,
}
}
pub fn membership_passes_post_filter(
membership: &CapabilityMembership,
legacy: &LegacyFilter,
) -> bool {
if legacy.require_gpu {
let has_gpu = match &membership.hardware {
Some(h) => h.gpu_count > 0 || h.gpu_vendor.is_some(),
None => false,
};
if !has_gpu {
return false;
}
}
if let Some(min_mem) = legacy.min_memory_gb {
let mem = membership
.hardware
.as_ref()
.and_then(|h| h.memory_gb)
.unwrap_or(0);
if mem < min_mem {
return false;
}
}
if let Some(min_vram) = legacy.min_vram_gb {
let vram = membership
.hardware
.as_ref()
.and_then(|h| h.vram_gb)
.unwrap_or(0);
if vram < min_vram {
return false;
}
}
if let Some(want_vendor) = legacy.gpu_vendor {
let got = membership
.hardware
.as_ref()
.and_then(|h| h.gpu_vendor.as_deref())
.unwrap_or("");
if !gpu_vendor_matches(got, want_vendor) {
return false;
}
}
if !legacy.require_models.is_empty() || !legacy.require_tools.is_empty() {
let mut caps = super::super::capability::CapabilitySet::new();
for s in &membership.tags {
if let Ok(tag) = super::super::tag::Tag::parse(s) {
caps.tags.insert(tag);
}
}
if !legacy.require_models.is_empty()
&& !legacy.require_models.iter().any(|m| caps.has_model(m))
{
return false;
}
if !legacy.require_tools.is_empty()
&& !legacy.require_tools.iter().any(|t| caps.has_tool(t))
{
return false;
}
}
true
}
fn gpu_vendor_matches(canonical: &str, want: GpuVendor) -> bool {
matches!(
(canonical, want),
("nvidia", GpuVendor::Nvidia)
| ("amd", GpuVendor::Amd)
| ("intel", GpuVendor::Intel)
| ("apple", GpuVendor::Apple)
| ("qualcomm", GpuVendor::Qualcomm)
| ("unknown", GpuVendor::Unknown)
)
}
fn gpu_vendor_canonical(vendor: GpuVendor) -> &'static str {
match vendor {
GpuVendor::Nvidia => "nvidia",
GpuVendor::Amd => "amd",
GpuVendor::Intel => "intel",
GpuVendor::Apple => "apple",
GpuVendor::Qualcomm => "qualcomm",
GpuVendor::Unknown => "unknown",
}
}
pub fn apply_legacy_announcement(
fold: &Fold<CapabilityFold>,
ann: CapabilityAnnouncement,
) -> Result<ApplyOutcome, FoldError> {
let fold_ann = translate_announcement(&ann);
fold.apply(fold_ann)
}
pub fn synthesize_capability_set(
fold: &Fold<CapabilityFold>,
node_id: NodeId,
) -> super::super::capability::CapabilitySet {
let mut caps = super::super::capability::CapabilitySet::new();
fold.with_state(|state| {
let Some(keys) = state.by_node.get(&node_id) else {
return;
};
for k in keys {
let Some(entry) = state.entries.get(k) else {
continue;
};
for s in &entry.payload.tags {
if let Ok(tag) = super::super::tag::Tag::parse(s) {
caps.tags.insert(tag);
}
}
for (mk, mv) in &entry.payload.metadata {
caps.metadata.insert(mk.clone(), mv.clone());
}
}
});
caps
}
pub fn may_execute(
fold: &Fold<CapabilityFold>,
target_node: NodeId,
capability_tag: &str,
caller_node: NodeId,
) -> bool {
fold.with_state(|state| {
let Some(keys) = state.by_node.get(&target_node) else {
return false;
};
let mut target_carries_tag = false;
let mut allowed_nodes: Vec<u64> = Vec::new();
let mut allowed_subnets: Vec<super::super::subnet::SubnetId> = Vec::new();
let mut allowed_groups: Vec<super::super::group::GroupId> = Vec::new();
for k in keys {
let Some(entry) = state.entries.get(k) else {
continue;
};
if entry.payload.tags.iter().any(|t| t == capability_tag) {
target_carries_tag = true;
}
allowed_nodes.extend(entry.payload.allowed_nodes.iter().copied());
allowed_subnets.extend(entry.payload.allowed_subnets.iter().copied());
allowed_groups.extend(entry.payload.allowed_groups.iter().cloned());
}
if !target_carries_tag {
return false;
}
if allowed_nodes.is_empty() && allowed_subnets.is_empty() && allowed_groups.is_empty() {
return true;
}
if allowed_nodes.contains(&caller_node) {
return true;
}
if !allowed_subnets.is_empty() || !allowed_groups.is_empty() {
let Some(caller_keys) = state.by_node.get(&caller_node) else {
return false;
};
let mut caller_subnet: Option<super::super::subnet::SubnetId> = None;
let mut caller_groups: Vec<super::super::group::GroupId> = Vec::new();
for k in caller_keys {
let Some(entry) = state.entries.get(k) else {
continue;
};
for raw in &entry.payload.tags {
if let Some(subnet) = super::super::subnet::SubnetId::from_tag(raw) {
caller_subnet = Some(subnet);
}
if let Some(group) = super::super::group::GroupId::from_tag(raw) {
caller_groups.push(group);
}
}
}
if let Some(subnet) = caller_subnet {
if allowed_subnets.contains(&subnet) {
return true;
}
}
for g in &caller_groups {
if allowed_groups.contains(g) {
return true;
}
}
}
false
})
}
pub fn translate_announcement(
ann: &CapabilityAnnouncement,
) -> SignedAnnouncement<CapabilityMembership> {
let views = ann.capabilities.views();
let hw_view = views.hardware();
let primary_gpu = hw_view.gpu.as_ref();
let gpu_count =
(primary_gpu.is_some() as u8).saturating_add(hw_view.additional_gpus.len() as u8);
let gpu_vendor = primary_gpu.map(|g| gpu_vendor_canonical(g.vendor).to_string());
let vram_gb = {
let mut total: u32 = 0;
if let Some(g) = primary_gpu {
total = total.saturating_add(g.vram_gb);
}
for g in &hw_view.additional_gpus {
total = total.saturating_add(g.vram_gb);
}
(gpu_count > 0).then_some(total)
};
let memory_gb = (hw_view.memory_gb > 0).then_some(hw_view.memory_gb);
let hardware = if primary_gpu.is_some() || memory_gb.is_some() {
Some(HardwareSummary {
gpu_vendor,
gpu_count,
memory_gb,
vram_gb,
})
} else {
None
};
let tags: Vec<String> = ann
.capabilities
.tags
.iter()
.map(|t| t.to_string())
.collect();
let region = tags
.iter()
.find_map(|t| t.strip_prefix("scope:region:").map(String::from));
SignedAnnouncement::placeholder(
CapabilityFold::KIND_ID,
0,
ann.node_id,
ann.version.max(1),
EnvelopeMeta {
announced_at: ann.timestamp_ns / 1_000,
ttl_secs: Some(ann.ttl_secs),
flags: 0,
},
CapabilityMembership {
class_hash: 0,
tags,
hardware,
state: NodeState::Idle,
region,
price_quote: None,
reflex_addr: ann.reflex_addr,
allowed_nodes: ann.allowed_nodes.clone(),
allowed_subnets: ann.allowed_subnets.clone(),
allowed_groups: ann.allowed_groups.clone(),
metadata: ann.capabilities.metadata.clone(),
},
)
}
pub fn find_nodes_matching(fold: &Fold<CapabilityFold>, legacy: &LegacyFilter) -> Vec<NodeId> {
let fold_filter = translate_filter(legacy);
let matches = fold.query(CapabilityQuery::Composite(fold_filter));
let mut node_set: HashSet<NodeId> = HashSet::new();
for ((_class, node_id), membership) in matches {
if membership_passes_post_filter(&membership, legacy) {
node_set.insert(node_id);
}
}
let mut out: Vec<NodeId> = node_set.into_iter().collect();
out.sort_unstable();
out
}
pub fn target_matches_filter(
fold: &Fold<CapabilityFold>,
node_id: NodeId,
legacy: &LegacyFilter,
) -> bool {
fold.with_state(|state| {
let Some(keys) = state.by_node.get(&node_id) else {
return false;
};
for key in keys {
let Some(entry) = state.entries.get(key) else {
continue;
};
let membership = &entry.payload;
let tags_ok = legacy
.require_tags
.iter()
.all(|t| membership.tags.iter().any(|m| m == t));
if !tags_ok {
continue;
}
if !membership_passes_post_filter(membership, legacy) {
continue;
}
return true;
}
false
})
}
pub(crate) fn scope_from_membership_tags(tags: &[String]) -> CapabilityScope {
let mut tenants: Vec<String> = Vec::new();
let mut regions: Vec<String> = Vec::new();
let mut subnet_local = false;
for tag in tags {
let Some(body) = tag.strip_prefix("scope:") else {
continue;
};
if body == "subnet-local" {
subnet_local = true;
} else if let Some(id) = body.strip_prefix("tenant:") {
if !id.is_empty() {
tenants.push(id.to_string());
}
} else if let Some(name) = body.strip_prefix("region:") {
if !name.is_empty() {
regions.push(name.to_string());
}
}
}
if subnet_local {
CapabilityScope::SubnetLocal
} else {
match (tenants.is_empty(), regions.is_empty()) {
(true, true) => CapabilityScope::Global,
(false, true) => CapabilityScope::Tenants(tenants),
(true, false) => CapabilityScope::Regions(regions),
(false, false) => CapabilityScope::TenantsAndRegions { tenants, regions },
}
}
}
pub fn filter_by_predicate(
fold: &Fold<CapabilityFold>,
predicate: &super::super::predicate::Predicate,
) -> Vec<(NodeId, super::super::capability::CapabilitySet)> {
let publishers: Vec<NodeId> = fold.with_state(|state| state.by_node.keys().copied().collect());
let mut out = Vec::new();
for node_id in publishers {
let caps = synthesize_capability_set(fold, node_id);
let owned_tags: Vec<super::super::tag::Tag> = caps.tags.iter().cloned().collect();
let ctx = super::super::predicate::EvalContext::new(&owned_tags, &caps.metadata);
if predicate.evaluate_unplanned(&ctx) {
out.push((node_id, caps));
}
}
out
}
pub fn find_nodes_matching_scoped(
fold: &Fold<CapabilityFold>,
legacy: &LegacyFilter,
scope: &ScopeFilter<'_>,
same_subnet_lookup: impl Fn(NodeId) -> bool,
) -> Vec<NodeId> {
let fold_filter = translate_filter(legacy);
let matches = fold.query(CapabilityQuery::Composite(fold_filter));
let mut node_set: HashSet<NodeId> = HashSet::new();
for ((_class, node_id), membership) in matches {
if !membership_passes_post_filter(&membership, legacy) {
continue;
}
let candidate_scope = scope_from_membership_tags(&membership.tags);
let same_subnet = same_subnet_lookup(node_id);
if !matches_scope(&candidate_scope, scope, same_subnet) {
continue;
}
node_set.insert(node_id);
}
let mut out: Vec<NodeId> = node_set.into_iter().collect();
out.sort_unstable();
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::behavior::fold::{
EnvelopeMeta, FoldKind, NodeState, SignedAnnouncement,
};
use crate::adapter::net::identity::EntityKeypair;
use std::time::Duration;
fn sign_member(
kp: &EntityKeypair,
node_id: NodeId,
class: u64,
tags: Vec<&str>,
hardware: Option<super::super::capability::HardwareSummary>,
) -> SignedAnnouncement<CapabilityMembership> {
SignedAnnouncement::sign(
kp,
super::super::capability::CapabilityFold::KIND_ID,
class,
node_id,
1,
EnvelopeMeta::default(),
CapabilityMembership {
class_hash: class,
tags: tags.into_iter().map(String::from).collect(),
hardware,
state: NodeState::Idle,
region: None,
price_quote: None,
reflex_addr: None,
allowed_nodes: Vec::new(),
allowed_subnets: Vec::new(),
allowed_groups: Vec::new(),
metadata: std::collections::BTreeMap::new(),
},
)
.expect("sign")
}
fn new_fold() -> Fold<CapabilityFold> {
Fold::with_sweep_interval(Duration::ZERO)
}
#[test]
fn translate_filter_passes_require_tags_through_and_defers_models_and_tools() {
let legacy = LegacyFilter {
require_tags: vec!["gpu".into()],
require_models: vec!["llama3".into()],
require_tools: vec!["ffmpeg".into()],
..LegacyFilter::default()
};
let fold_filter = translate_filter(&legacy);
assert_eq!(fold_filter.tags_all, vec!["gpu".to_string()]);
assert!(!fold_filter.tags_all.contains(&"model:llama3".to_string()));
assert!(!fold_filter.tags_all.contains(&"tool:ffmpeg".to_string()));
}
#[test]
fn membership_passes_post_filter_matches_models_via_canonical_tag_bundle() {
let legacy = LegacyFilter {
require_models: vec!["llama3".into()],
..LegacyFilter::default()
};
let pass = CapabilityMembership {
class_hash: 0x100,
tags: vec!["software.model.0.id=llama3".into()],
hardware: None,
state: NodeState::Idle,
region: None,
price_quote: None,
reflex_addr: None,
allowed_nodes: Vec::new(),
allowed_subnets: Vec::new(),
allowed_groups: Vec::new(),
metadata: std::collections::BTreeMap::new(),
};
assert!(membership_passes_post_filter(&pass, &legacy));
let fail = CapabilityMembership {
tags: vec!["software.model.0.id=mistral".into()],
..pass.clone()
};
assert!(!membership_passes_post_filter(&fail, &legacy));
let bare = CapabilityMembership {
tags: vec![],
..pass
};
assert!(!membership_passes_post_filter(&bare, &legacy));
}
#[test]
fn membership_passes_post_filter_enforces_min_memory_and_gpu() {
let legacy = LegacyFilter {
min_memory_gb: Some(64),
require_gpu: true,
..LegacyFilter::default()
};
let ok = CapabilityMembership {
class_hash: 0x100,
tags: vec![],
hardware: Some(super::super::capability::HardwareSummary {
gpu_vendor: Some("nvidia".into()),
gpu_count: 2,
memory_gb: Some(128),
vram_gb: Some(80),
}),
state: NodeState::Idle,
region: None,
price_quote: None,
reflex_addr: None,
allowed_nodes: Vec::new(),
allowed_subnets: Vec::new(),
allowed_groups: Vec::new(),
metadata: std::collections::BTreeMap::new(),
};
assert!(membership_passes_post_filter(&ok, &legacy));
let low_mem = CapabilityMembership {
hardware: Some(super::super::capability::HardwareSummary {
gpu_vendor: Some("nvidia".into()),
gpu_count: 2,
memory_gb: Some(32),
vram_gb: Some(80),
}),
..ok.clone()
};
assert!(!membership_passes_post_filter(&low_mem, &legacy));
let no_hw = CapabilityMembership {
hardware: None,
..ok
};
assert!(!membership_passes_post_filter(&no_hw, &legacy));
}
#[test]
fn find_nodes_matching_dedupes_publisher_across_classes() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_member(&kp, 0xAA, 0x100, vec!["gpu"], None))
.expect("apply 0x100");
fold.apply(sign_member(&kp, 0xAA, 0x101, vec!["gpu"], None))
.expect("apply 0x101");
let mut legacy = LegacyFilter::default();
legacy.require_tags.push("gpu".into());
let nodes = find_nodes_matching(&fold, &legacy);
assert_eq!(nodes, vec![0xAA]);
}
#[test]
fn target_matches_filter_agrees_with_find_nodes_matching() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_member(&kp, 0xAA, 0x100, vec!["gpu", "cuda"], None))
.expect("apply AA");
fold.apply(sign_member(&kp, 0xBB, 0x100, vec!["cpu-only"], None))
.expect("apply BB");
let probe = |legacy: &LegacyFilter, candidates: &[NodeId]| {
let bulk: HashSet<NodeId> = find_nodes_matching(&fold, legacy).into_iter().collect();
for &n in candidates {
assert_eq!(
bulk.contains(&n),
target_matches_filter(&fold, n, legacy),
"node 0x{:x} verdict mismatch for filter {:?}",
n,
legacy.require_tags
);
}
};
probe(&LegacyFilter::default(), &[0xAA, 0xBB, 0xCC]);
let mut f = LegacyFilter::default();
f.require_tags.push("gpu".into());
probe(&f, &[0xAA, 0xBB, 0xCC]);
assert!(!target_matches_filter(
&fold,
0xDEAD,
&LegacyFilter::default()
));
}
#[test]
fn target_matches_filter_applies_post_filter_predicates() {
let fold = new_fold();
let kp = EntityKeypair::generate();
let hw = HardwareSummary {
gpu_vendor: None,
gpu_count: 0,
memory_gb: Some(32),
vram_gb: None,
};
fold.apply(sign_member(&kp, 0xAA, 0x100, vec!["gpu"], Some(hw)))
.expect("apply AA");
let mut tight = LegacyFilter::default();
tight.require_tags.push("gpu".into());
tight.min_memory_gb = Some(64);
assert!(!target_matches_filter(&fold, 0xAA, &tight));
let mut loose = LegacyFilter::default();
loose.require_tags.push("gpu".into());
loose.min_memory_gb = Some(16);
assert!(target_matches_filter(&fold, 0xAA, &loose));
}
#[test]
fn scope_from_membership_tags_parses_canonical_strings() {
let global = scope_from_membership_tags(&["gpu".into(), "scope:global".into()]);
assert!(matches!(global, CapabilityScope::Global));
let subnet_local = scope_from_membership_tags(&["scope:subnet-local".into(), "gpu".into()]);
assert!(matches!(subnet_local, CapabilityScope::SubnetLocal));
let tenant = scope_from_membership_tags(&["scope:tenant:acme".into()]);
match tenant {
CapabilityScope::Tenants(ts) => assert_eq!(ts, vec!["acme".to_string()]),
other => panic!("expected Tenants, got {other:?}"),
}
let region = scope_from_membership_tags(&["scope:region:us-east".into()]);
match region {
CapabilityScope::Regions(rs) => assert_eq!(rs, vec!["us-east".to_string()]),
other => panic!("expected Regions, got {other:?}"),
}
}
#[test]
fn translate_announcement_projects_legacy_hardware_into_summary() {
use crate::adapter::net::behavior::capability::{
CapabilityAnnouncement, CapabilitySet, GpuInfo, GpuVendor as LegacyGpuVendor,
HardwareCapabilities,
};
use crate::adapter::net::identity::EntityId;
let caps = CapabilitySet::new().with_hardware(
HardwareCapabilities::new()
.with_memory(128)
.with_gpu(GpuInfo {
vendor: LegacyGpuVendor::Nvidia,
model: "h100".into(),
vram_gb: 80,
compute_units: 0,
tensor_cores: 0,
fp16_tflops_x10: 0,
}),
);
let ann = CapabilityAnnouncement::new(0xAA, EntityId::from_bytes([0u8; 32]), 7, caps);
let translated = translate_announcement(&ann);
assert_eq!(translated.node_id, 0xAA);
assert_eq!(translated.generation, 7);
let hw = translated.payload.hardware.expect("hardware summary set");
assert_eq!(hw.memory_gb, Some(128));
assert_eq!(hw.gpu_count, 1);
assert_eq!(hw.gpu_vendor.as_deref(), Some("nvidia"));
assert_eq!(hw.vram_gb, Some(80));
}
#[test]
fn translate_announcement_promotes_version_zero_to_generation_one() {
use crate::adapter::net::behavior::capability::{CapabilityAnnouncement, CapabilitySet};
use crate::adapter::net::identity::EntityId;
let ann = CapabilityAnnouncement::new(
0xAA,
EntityId::from_bytes([0u8; 32]),
0,
CapabilitySet::new(),
);
let translated = translate_announcement(&ann);
assert_eq!(translated.generation, 1);
}
#[test]
fn find_nodes_matching_scoped_excludes_subnet_local_non_same_subnet() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_member(
&kp,
0xAA,
0x100,
vec!["gpu", "scope:subnet-local"],
None,
))
.expect("apply AA subnet-local");
fold.apply(sign_member(&kp, 0xBB, 0x100, vec!["gpu"], None))
.expect("apply BB global");
let mut legacy = LegacyFilter::default();
legacy.require_tags.push("gpu".into());
let lookup = |nid: NodeId| nid == 0xBB;
let mut nodes =
find_nodes_matching_scoped(&fold, &legacy, &ScopeFilter::SameSubnet, lookup);
nodes.sort();
assert_eq!(nodes, vec![0xBB]);
}
}