use std::collections::{BTreeMap, HashMap, HashSet};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use super::state::{FoldIndex, FoldState, NodeId};
use super::FoldKind;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NodeState {
Idle,
Busy,
Reserved,
Faulty,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct HardwareSummary {
pub gpu_vendor: Option<String>,
pub gpu_count: u8,
pub memory_gb: Option<u32>,
pub vram_gb: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CapabilityMembership {
pub class_hash: u64,
pub tags: Vec<String>,
pub hardware: Option<HardwareSummary>,
pub state: NodeState,
pub region: Option<String>,
pub price_quote: Option<u64>,
pub reflex_addr: Option<std::net::SocketAddr>,
pub allowed_nodes: Vec<u64>,
pub allowed_subnets: Vec<super::super::subnet::SubnetId>,
pub allowed_groups: Vec<super::super::group::GroupId>,
pub metadata: BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CapabilityQuery {
InClass(u64),
HasAllTags(Vec<String>),
HasAnyTag(Vec<String>),
InState(NodeState),
InRegion(String),
Composite(CapabilityFilter),
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct CapabilityFilter {
pub class: Option<u64>,
pub tags_all: Vec<String>,
pub tags_any: Vec<String>,
pub state: Option<NodeState>,
pub region: Option<String>,
pub limit: usize,
}
pub type CapabilityMatch = ((u64, NodeId), CapabilityMembership);
#[derive(Debug, Default)]
pub struct CapabilityIndexInner {
by_tag: HashMap<String, HashSet<(u64, NodeId)>>,
by_region: HashMap<String, HashSet<(u64, NodeId)>>,
by_state: HashMap<NodeState, HashSet<(u64, NodeId)>>,
}
impl FoldIndex<CapabilityFold> for CapabilityIndexInner {
fn on_insert(&mut self, key: &(u64, NodeId), payload: &CapabilityMembership) {
for tag in &payload.tags {
self.by_tag.entry(tag.clone()).or_default().insert(*key);
}
if let Some(region) = &payload.region {
self.by_region
.entry(region.clone())
.or_default()
.insert(*key);
}
self.by_state.entry(payload.state).or_default().insert(*key);
}
fn on_remove(&mut self, key: &(u64, NodeId), payload: &CapabilityMembership) {
for tag in &payload.tags {
if let Some(set) = self.by_tag.get_mut(tag) {
set.remove(key);
if set.is_empty() {
self.by_tag.remove(tag);
}
}
}
if let Some(region) = &payload.region {
if let Some(set) = self.by_region.get_mut(region) {
set.remove(key);
if set.is_empty() {
self.by_region.remove(region);
}
}
}
if let Some(set) = self.by_state.get_mut(&payload.state) {
set.remove(key);
if set.is_empty() {
self.by_state.remove(&payload.state);
}
}
}
fn clear(&mut self) {
self.by_tag.clear();
self.by_region.clear();
self.by_state.clear();
}
}
#[derive(Debug)]
pub struct CapabilityFold;
impl FoldKind for CapabilityFold {
const KIND_ID: u16 = 1;
const CHANNEL_PREFIX: &'static str = "fold:cap:";
const DEFAULT_TTL: Duration = Duration::from_secs(60);
type Key = (u64, NodeId);
type Payload = CapabilityMembership;
type Query = CapabilityQuery;
type Result = Vec<CapabilityMatch>;
type Index = CapabilityIndexInner;
fn key_for(node_id: NodeId, payload: &Self::Payload) -> Self::Key {
(payload.class_hash, node_id)
}
fn build_index() -> CapabilityIndexInner {
CapabilityIndexInner::default()
}
fn query(
state: &FoldState<Self>,
index: &CapabilityIndexInner,
query: CapabilityQuery,
) -> Vec<CapabilityMatch> {
match query {
CapabilityQuery::InClass(class) => state
.entries
.iter()
.filter(|((c, _), _)| *c == class)
.map(|(k, e)| (*k, e.payload.clone()))
.collect(),
CapabilityQuery::HasAllTags(tags) => resolve_keys_all_tags(index, &tags)
.into_iter()
.filter_map(|k| state.entries.get(&k).map(|e| (k, e.payload.clone())))
.collect(),
CapabilityQuery::HasAnyTag(tags) => {
let mut seen: HashSet<(u64, NodeId)> = HashSet::new();
for tag in &tags {
if let Some(keys) = index.by_tag.get(tag) {
seen.extend(keys.iter().copied());
}
}
seen.into_iter()
.filter_map(|k| state.entries.get(&k).map(|e| (k, e.payload.clone())))
.collect()
}
CapabilityQuery::InState(s) => index
.by_state
.get(&s)
.into_iter()
.flat_map(|set| set.iter().copied())
.filter_map(|k| state.entries.get(&k).map(|e| (k, e.payload.clone())))
.collect(),
CapabilityQuery::InRegion(r) => index
.by_region
.get(&r)
.into_iter()
.flat_map(|set| set.iter().copied())
.filter_map(|k| state.entries.get(&k).map(|e| (k, e.payload.clone())))
.collect(),
CapabilityQuery::Composite(filter) => composite_query(state, index, &filter),
}
}
}
fn resolve_keys_all_tags(index: &CapabilityIndexInner, tags: &[String]) -> HashSet<(u64, NodeId)> {
if tags.is_empty() {
return index
.by_state
.values()
.flat_map(|set| set.iter().copied())
.collect();
}
let mut tags_by_selectivity: Vec<&String> = tags.iter().collect();
tags_by_selectivity.sort_by_key(|t| index.by_tag.get(*t).map(|s| s.len()).unwrap_or(0));
let Some(first) = tags_by_selectivity.first() else {
return HashSet::new();
};
let Some(initial) = index.by_tag.get(*first) else {
return HashSet::new();
};
let mut candidates: HashSet<(u64, NodeId)> = initial.iter().copied().collect();
for tag in tags_by_selectivity.iter().skip(1) {
let Some(bucket) = index.by_tag.get(*tag) else {
return HashSet::new();
};
candidates.retain(|k| bucket.contains(k));
if candidates.is_empty() {
break;
}
}
candidates
}
fn composite_query(
state: &FoldState<CapabilityFold>,
index: &CapabilityIndexInner,
filter: &CapabilityFilter,
) -> Vec<CapabilityMatch> {
let mut candidates: HashSet<(u64, NodeId)> = if !filter.tags_all.is_empty() {
resolve_keys_all_tags(index, &filter.tags_all)
} else if let Some(state_filter) = filter.state {
index
.by_state
.get(&state_filter)
.cloned()
.unwrap_or_default()
} else if let Some(region) = &filter.region {
index.by_region.get(region).cloned().unwrap_or_default()
} else if let Some(class) = filter.class {
state
.entries
.keys()
.filter(|(c, _)| *c == class)
.copied()
.collect()
} else {
state.entries.keys().copied().collect()
};
if let Some(class) = filter.class {
candidates.retain(|(c, _)| *c == class);
}
if let Some(state_filter) = filter.state {
if let Some(bucket) = index.by_state.get(&state_filter) {
candidates.retain(|k| bucket.contains(k));
} else {
candidates.clear();
}
}
if let Some(region) = &filter.region {
if let Some(bucket) = index.by_region.get(region) {
candidates.retain(|k| bucket.contains(k));
} else {
candidates.clear();
}
}
if !filter.tags_any.is_empty() {
let mut tags_any_union: HashSet<(u64, NodeId)> = HashSet::new();
for tag in &filter.tags_any {
if let Some(bucket) = index.by_tag.get(tag) {
tags_any_union.extend(bucket.iter().copied());
}
}
candidates.retain(|k| tags_any_union.contains(k));
}
if !filter.tags_all.is_empty() {
let strict_set = resolve_keys_all_tags(index, &filter.tags_all);
candidates.retain(|k| strict_set.contains(k));
}
let mut matches: Vec<CapabilityMatch> = candidates
.into_iter()
.filter_map(|k| state.entries.get(&k).map(|e| (k, e.payload.clone())))
.collect();
if filter.limit > 0 && matches.len() > filter.limit {
matches.truncate(filter.limit);
}
matches
}
pub fn capability_tags_for(fold: &super::Fold<CapabilityFold>, node_id: NodeId) -> Vec<String> {
fold.with_state(|state| tags_union_for(state, node_id))
}
pub fn capability_tags_for_all(
fold: &super::Fold<CapabilityFold>,
) -> std::collections::HashMap<NodeId, Vec<String>> {
fold.with_state(|state| {
let mut out: std::collections::HashMap<NodeId, Vec<String>> =
std::collections::HashMap::with_capacity(state.by_node.len());
for node_id in state.by_node.keys() {
out.insert(*node_id, tags_union_for(state, *node_id));
}
out
})
}
fn tags_union_for(state: &FoldState<CapabilityFold>, node_id: NodeId) -> Vec<String> {
let Some(keys) = state.by_node.get(&node_id) else {
return Vec::new();
};
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
for key in keys {
if let Some(entry) = state.entries.get(key) {
for tag in &entry.payload.tags {
seen.insert(tag.clone());
}
}
}
seen.into_iter().collect()
}
pub fn reflex_addr_for(
fold: &super::Fold<CapabilityFold>,
node_id: NodeId,
) -> Option<std::net::SocketAddr> {
fold.with_state(|state| {
let keys = state.by_node.get(&node_id)?;
for key in keys {
if let Some(entry) = state.entries.get(key) {
if let Some(addr) = entry.payload.reflex_addr {
return Some(addr);
}
}
}
None
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use super::*;
use crate::adapter::net::behavior::fold::{
ApplyOutcome, EnvelopeMeta, Fold, FoldRegistry, SignedAnnouncement,
};
use crate::adapter::net::identity::EntityKeypair;
fn sign_cap(
keypair: &EntityKeypair,
publisher: NodeId,
generation: u64,
class: u64,
tags: Vec<&str>,
state: NodeState,
region: Option<&str>,
) -> SignedAnnouncement<CapabilityMembership> {
sign_cap_with_reflex(
keypair, publisher, generation, class, tags, state, region, None,
)
}
#[allow(clippy::too_many_arguments)]
fn sign_cap_with_reflex(
keypair: &EntityKeypair,
publisher: NodeId,
generation: u64,
class: u64,
tags: Vec<&str>,
state: NodeState,
region: Option<&str>,
reflex_addr: Option<std::net::SocketAddr>,
) -> SignedAnnouncement<CapabilityMembership> {
SignedAnnouncement::sign(
keypair,
CapabilityFold::KIND_ID,
class,
publisher,
generation,
EnvelopeMeta::default(),
CapabilityMembership {
class_hash: class,
tags: tags.into_iter().map(String::from).collect(),
hardware: None,
state,
region: region.map(String::from),
price_quote: None,
reflex_addr,
allowed_nodes: Vec::new(),
allowed_subnets: Vec::new(),
allowed_groups: Vec::new(),
metadata: BTreeMap::new(),
},
)
.expect("sign succeeds")
}
fn new_fold() -> Fold<CapabilityFold> {
Fold::with_sweep_interval(Duration::ZERO)
}
#[test]
fn first_announcement_installs_and_populates_secondary_index() {
let fold = new_fold();
let kp = EntityKeypair::generate();
let outcome = fold
.apply(sign_cap(
&kp,
0xA,
1,
0x100,
vec!["hardware.gpu", "vendor.nvidia"],
NodeState::Idle,
Some("us-east"),
))
.expect("apply");
assert_eq!(outcome, ApplyOutcome::Inserted);
let hits = fold.query(CapabilityQuery::InClass(0x100));
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].0, (0x100, 0xA));
let hits = fold.query(CapabilityQuery::HasAllTags(vec!["hardware.gpu".into()]));
assert_eq!(hits.len(), 1);
let hits = fold.query(CapabilityQuery::InState(NodeState::Idle));
assert_eq!(hits.len(), 1);
let hits = fold.query(CapabilityQuery::InRegion("us-east".into()));
assert_eq!(hits.len(), 1);
}
#[test]
fn each_publisher_owns_its_own_class_entry_no_cross_override() {
let fold = new_fold();
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
fold.apply(sign_cap(
&kp_a,
0xA,
1,
0x100,
vec!["gpu"],
NodeState::Idle,
None,
))
.expect("a");
fold.apply(sign_cap(
&kp_b,
0xB,
1,
0x100,
vec!["gpu"],
NodeState::Busy,
None,
))
.expect("b");
let hits = fold.query(CapabilityQuery::InClass(0x100));
assert_eq!(hits.len(), 2, "both publishers' entries coexist");
let idle = fold.query(CapabilityQuery::InState(NodeState::Idle));
assert_eq!(idle.len(), 1);
assert_eq!(idle[0].0, (0x100, 0xA));
let busy = fold.query(CapabilityQuery::InState(NodeState::Busy));
assert_eq!(busy.len(), 1);
assert_eq!(busy[0].0, (0x100, 0xB));
}
#[test]
fn replace_updates_secondary_index_drops_stale_tags() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_cap(
&kp,
0xA,
1,
0x100,
vec!["gpu"],
NodeState::Idle,
Some("us-east"),
))
.expect("v1");
fold.apply(sign_cap(
&kp,
0xA,
2,
0x100,
vec!["tpu"],
NodeState::Busy,
Some("us-west"),
))
.expect("v2");
let stale = fold.query(CapabilityQuery::HasAllTags(vec!["gpu".into()]));
assert!(stale.is_empty());
let fresh = fold.query(CapabilityQuery::HasAllTags(vec!["tpu".into()]));
assert_eq!(fresh.len(), 1);
let stale_state = fold.query(CapabilityQuery::InState(NodeState::Idle));
assert!(stale_state.is_empty());
let new_state = fold.query(CapabilityQuery::InState(NodeState::Busy));
assert_eq!(new_state.len(), 1);
assert!(fold
.query(CapabilityQuery::InRegion("us-east".into()))
.is_empty());
assert_eq!(
fold.query(CapabilityQuery::InRegion("us-west".into()))
.len(),
1
);
}
#[test]
fn has_all_tags_finds_only_entries_carrying_every_tag() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_cap(
&kp,
0x1,
1,
0x100,
vec!["a", "b", "c"],
NodeState::Idle,
None,
))
.unwrap();
fold.apply(sign_cap(
&kp,
0x2,
1,
0x100,
vec!["a", "b"],
NodeState::Idle,
None,
))
.unwrap();
fold.apply(sign_cap(
&kp,
0x3,
1,
0x100,
vec!["a"],
NodeState::Idle,
None,
))
.unwrap();
let hits: std::collections::HashSet<_> = fold
.query(CapabilityQuery::HasAllTags(vec![
"a".into(),
"b".into(),
"c".into(),
]))
.into_iter()
.map(|((_, n), _)| n)
.collect();
assert_eq!(hits, [0x1].into_iter().collect());
let hits: std::collections::HashSet<_> = fold
.query(CapabilityQuery::HasAllTags(vec!["a".into(), "b".into()]))
.into_iter()
.map(|((_, n), _)| n)
.collect();
assert_eq!(hits, [0x1, 0x2].into_iter().collect());
let hits: std::collections::HashSet<_> = fold
.query(CapabilityQuery::HasAllTags(vec!["a".into()]))
.into_iter()
.map(|((_, n), _)| n)
.collect();
assert_eq!(hits, [0x1, 0x2, 0x3].into_iter().collect());
}
#[test]
fn has_any_tag_returns_union_across_buckets() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_cap(
&kp,
0x1,
1,
0x100,
vec!["x"],
NodeState::Idle,
None,
))
.unwrap();
fold.apply(sign_cap(
&kp,
0x2,
1,
0x100,
vec!["y"],
NodeState::Idle,
None,
))
.unwrap();
fold.apply(sign_cap(
&kp,
0x3,
1,
0x100,
vec!["z"],
NodeState::Idle,
None,
))
.unwrap();
let hits: std::collections::HashSet<_> = fold
.query(CapabilityQuery::HasAnyTag(vec!["x".into(), "y".into()]))
.into_iter()
.map(|((_, n), _)| n)
.collect();
assert_eq!(hits, [0x1, 0x2].into_iter().collect());
}
#[test]
fn composite_query_intersects_every_populated_filter_axis() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_cap(
&kp,
0xA,
1,
0x100,
vec!["gpu"],
NodeState::Idle,
Some("us-east"),
))
.unwrap();
fold.apply(sign_cap(
&kp,
0xB,
1,
0x100,
vec!["gpu"],
NodeState::Busy,
Some("us-east"),
))
.unwrap();
fold.apply(sign_cap(
&kp,
0xC,
1,
0x100,
vec!["gpu"],
NodeState::Idle,
Some("us-west"),
))
.unwrap();
let filter = CapabilityFilter {
class: Some(0x100),
tags_all: vec!["gpu".into()],
state: Some(NodeState::Idle),
region: Some("us-east".into()),
..CapabilityFilter::default()
};
let hits: Vec<_> = fold
.query(CapabilityQuery::Composite(filter))
.into_iter()
.map(|((_, n), _)| n)
.collect();
assert_eq!(hits, vec![0xA]);
}
#[test]
fn composite_query_honours_limit() {
let fold = new_fold();
let kp = EntityKeypair::generate();
for i in 0..10 {
fold.apply(sign_cap(
&kp,
i,
1,
0x100,
vec!["gpu"],
NodeState::Idle,
None,
))
.unwrap();
}
let filter = CapabilityFilter {
class: Some(0x100),
limit: 3,
..CapabilityFilter::default()
};
let hits = fold.query(CapabilityQuery::Composite(filter));
assert_eq!(hits.len(), 3);
}
#[test]
fn composite_query_with_tags_any_filters_correctly() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_cap(
&kp,
0xA,
1,
0x100,
vec!["common", "fast"],
NodeState::Idle,
None,
))
.unwrap();
fold.apply(sign_cap(
&kp,
0xB,
1,
0x100,
vec!["common", "slow"],
NodeState::Idle,
None,
))
.unwrap();
fold.apply(sign_cap(
&kp,
0xC,
1,
0x100,
vec!["common"],
NodeState::Idle,
None,
))
.unwrap();
let filter = CapabilityFilter {
tags_all: vec!["common".into()],
tags_any: vec!["fast".into(), "slow".into()],
..CapabilityFilter::default()
};
let hits: std::collections::HashSet<_> = fold
.query(CapabilityQuery::Composite(filter))
.into_iter()
.map(|((_, n), _)| n)
.collect();
assert_eq!(hits, [0xA, 0xB].into_iter().collect());
}
#[test]
fn evict_node_drops_every_class_entry_and_cleans_indexes() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_cap(
&kp,
0xA,
1,
0x100,
vec!["gpu"],
NodeState::Idle,
Some("r1"),
))
.unwrap();
fold.apply(sign_cap(
&kp,
0xA,
1,
0x200,
vec!["tpu"],
NodeState::Busy,
Some("r2"),
))
.unwrap();
fold.apply(sign_cap(
&kp,
0xB,
1,
0x100,
vec!["gpu"],
NodeState::Idle,
Some("r1"),
))
.unwrap();
assert_eq!(fold.stats().entries, 3);
fold.evict_node(0xA, "test");
assert_eq!(fold.stats().entries, 1);
assert_eq!(fold.stats().evictions, 2);
let gpu_hits: std::collections::HashSet<_> = fold
.query(CapabilityQuery::HasAllTags(vec!["gpu".into()]))
.into_iter()
.map(|((_, n), _)| n)
.collect();
assert_eq!(gpu_hits, [0xB].into_iter().collect());
let tpu_hits = fold.query(CapabilityQuery::HasAllTags(vec!["tpu".into()]));
assert!(tpu_hits.is_empty());
}
#[test]
fn reflex_addr_for_returns_first_advertised_addr_across_publisher_classes() {
use std::net::SocketAddr;
let fold = new_fold();
let kp = EntityKeypair::generate();
let addr: SocketAddr = "203.0.113.4:7000".parse().unwrap();
fold.apply(sign_cap_with_reflex(
&kp,
0xAA,
1,
0x100,
vec![],
NodeState::Idle,
None,
None,
))
.expect("class 0x100");
fold.apply(sign_cap_with_reflex(
&kp,
0xAA,
1,
0x101,
vec![],
NodeState::Idle,
None,
Some(addr),
))
.expect("class 0x101");
assert_eq!(super::reflex_addr_for(&fold, 0xAA), Some(addr));
assert_eq!(super::reflex_addr_for(&fold, 0xBB), None);
}
#[test]
fn reflex_addr_for_returns_none_when_publisher_advertises_no_addr() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_cap(&kp, 0xAA, 1, 0x100, vec![], NodeState::Idle, None))
.expect("class 0x100");
assert_eq!(super::reflex_addr_for(&fold, 0xAA), None);
}
#[test]
fn capability_tags_for_all_matches_per_node_walk() {
let fold = new_fold();
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
fold.apply(sign_cap(
&kp_a,
0xA,
1,
0x100,
vec!["gpu", "vendor.nvidia"],
NodeState::Idle,
None,
))
.expect("a-100");
fold.apply(sign_cap(
&kp_a,
0xA,
1,
0x200,
vec!["gpu", "model:llama"],
NodeState::Idle,
None,
))
.expect("a-200");
fold.apply(sign_cap(
&kp_b,
0xB,
1,
0x100,
vec!["cpu-only"],
NodeState::Idle,
None,
))
.expect("b-100");
let batched = super::capability_tags_for_all(&fold);
assert_eq!(batched.len(), 2);
let mut tags_a = batched.get(&0xA).cloned().unwrap_or_default();
tags_a.sort();
assert_eq!(
tags_a,
vec![
"gpu".to_string(),
"model:llama".to_string(),
"vendor.nvidia".to_string()
],
"publisher A unions tags across both class entries"
);
let mut tags_b = batched.get(&0xB).cloned().unwrap_or_default();
tags_b.sort();
assert_eq!(tags_b, vec!["cpu-only".to_string()]);
for (node_id, batched_tags) in &batched {
let mut single = super::capability_tags_for(&fold, *node_id);
single.sort();
let mut batched_sorted = batched_tags.clone();
batched_sorted.sort();
assert_eq!(single, batched_sorted, "mismatch for node 0x{:x}", node_id);
}
}
#[test]
fn capability_tags_for_all_returns_empty_for_empty_fold() {
let fold = new_fold();
let batched = super::capability_tags_for_all(&fold);
assert!(batched.is_empty());
}
#[test]
fn runtime_ttl_sweeps_stale_capability_entries() {
let fold = new_fold();
let kp = EntityKeypair::generate();
let ann = SignedAnnouncement::sign(
&kp,
CapabilityFold::KIND_ID,
0x100,
0xA,
1,
EnvelopeMeta {
ttl_secs: Some(0),
..Default::default()
},
CapabilityMembership {
class_hash: 0x100,
tags: vec!["gpu".into()],
hardware: None,
state: NodeState::Idle,
region: None,
price_quote: None,
reflex_addr: None,
allowed_nodes: Vec::new(),
allowed_subnets: Vec::new(),
allowed_groups: Vec::new(),
metadata: BTreeMap::new(),
},
)
.unwrap();
fold.apply(ann).unwrap();
assert_eq!(fold.stats().entries, 1);
std::thread::sleep(Duration::from_millis(10));
let n = fold.sweep_expired_now();
assert_eq!(n, 1);
assert_eq!(fold.stats().entries, 0);
assert_eq!(fold.stats().expiries, 1);
assert!(fold
.query(CapabilityQuery::HasAllTags(vec!["gpu".into()]))
.is_empty());
}
#[test]
fn capability_fold_plugs_into_registry_and_dispatches_signed_envelopes() {
let registry = FoldRegistry::new();
let fold: Arc<Fold<CapabilityFold>> = Arc::new(new_fold());
registry.register(fold.clone());
let kp = EntityKeypair::generate();
let ann = sign_cap(
&kp,
0xA,
1,
0x100,
vec!["gpu"],
NodeState::Idle,
Some("us-east"),
);
let bytes = ann.encode().expect("encode");
let outcome = registry.dispatch(&bytes, kp.entity_id()).expect("dispatch");
assert_eq!(outcome, ApplyOutcome::Inserted);
assert_eq!(fold.stats().entries, 1);
}
}