use serde::{Deserialize, Serialize};
use std::cell::OnceCell;
use std::collections::{BTreeMap, HashSet};
use std::hash::Hash;
use crate::adapter::net::behavior::tag::Tag;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[repr(u8)]
pub enum GpuVendor {
#[default]
Unknown = 0,
Nvidia = 1,
Amd = 2,
Intel = 3,
Apple = 4,
Qualcomm = 5,
}
impl From<u8> for GpuVendor {
fn from(v: u8) -> Self {
match v {
1 => Self::Nvidia,
2 => Self::Amd,
3 => Self::Intel,
4 => Self::Apple,
5 => Self::Qualcomm,
_ => Self::Unknown,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct GpuInfo {
pub vendor: GpuVendor,
pub model: String,
pub vram_gb: u32,
pub compute_units: u16,
pub tensor_cores: u16,
pub fp16_tflops_x10: u32,
}
impl Default for GpuInfo {
fn default() -> Self {
Self {
vendor: GpuVendor::Unknown,
model: String::new(),
vram_gb: 0,
compute_units: 0,
tensor_cores: 0,
fp16_tflops_x10: 0,
}
}
}
impl GpuInfo {
pub fn new(vendor: GpuVendor, model: impl Into<String>, vram_gb: u32) -> Self {
Self {
vendor,
model: model.into(),
vram_gb,
..Default::default()
}
}
pub fn with_compute_units(mut self, units: u16) -> Self {
self.compute_units = units;
self
}
pub fn with_tensor_cores(mut self, cores: u16) -> Self {
self.tensor_cores = cores;
self
}
pub fn with_fp16_tflops(mut self, tflops: f32) -> Self {
let scaled = (tflops * 10.0).max(0.0);
self.fp16_tflops_x10 = if scaled.is_finite() && scaled < u32::MAX as f32 {
scaled as u32
} else {
u32::MAX
};
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[repr(u8)]
pub enum AcceleratorType {
#[default]
Unknown = 0,
Tpu = 1,
Npu = 2,
Fpga = 3,
Asic = 4,
Dsp = 5,
}
impl From<u8> for AcceleratorType {
fn from(v: u8) -> Self {
match v {
1 => Self::Tpu,
2 => Self::Npu,
3 => Self::Fpga,
4 => Self::Asic,
5 => Self::Dsp,
_ => Self::Unknown,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AcceleratorInfo {
pub accel_type: AcceleratorType,
pub model: String,
pub memory_gb: u32,
pub tops_x10: u16,
}
impl AcceleratorInfo {
pub fn new(accel_type: AcceleratorType, model: impl Into<String>) -> Self {
Self {
accel_type,
model: model.into(),
memory_gb: 0,
tops_x10: 0,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct HardwareCapabilities {
pub cpu_cores: u16,
pub cpu_threads: u16,
pub memory_gb: u32,
pub gpu: Option<GpuInfo>,
pub additional_gpus: Vec<GpuInfo>,
pub storage_gb: u64,
pub network_gbps: u32,
pub accelerators: Vec<AcceleratorInfo>,
}
impl HardwareCapabilities {
pub fn new() -> Self {
Self::default()
}
pub fn with_cpu(mut self, cores: u16, threads: u16) -> Self {
self.cpu_cores = cores;
self.cpu_threads = threads;
self
}
pub fn with_memory(mut self, memory_gb: u32) -> Self {
self.memory_gb = memory_gb;
self
}
pub fn with_gpu(mut self, gpu: GpuInfo) -> Self {
self.gpu = Some(gpu);
self
}
pub fn add_gpu(mut self, gpu: GpuInfo) -> Self {
self.additional_gpus.push(gpu);
self
}
pub fn with_storage(mut self, storage_gb: u64) -> Self {
self.storage_gb = storage_gb;
self
}
pub fn with_network(mut self, network_gbps: u32) -> Self {
self.network_gbps = network_gbps;
self
}
pub fn add_accelerator(mut self, accel: AcceleratorInfo) -> Self {
self.accelerators.push(accel);
self
}
pub fn gpu_count(&self) -> usize {
self.gpu.as_ref().map(|_| 1).unwrap_or(0) + self.additional_gpus.len()
}
pub fn total_vram_gb(&self) -> u32 {
let primary = self.gpu.as_ref().map(|g| g.vram_gb).unwrap_or(0);
let additional: u32 = self.additional_gpus.iter().map(|g| g.vram_gb).sum();
primary + additional
}
pub fn has_gpu(&self) -> bool {
self.gpu.is_some()
}
pub fn gpu_vendor(&self) -> Option<GpuVendor> {
self.gpu.as_ref().map(|g| g.vendor)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct SoftwareCapabilities {
pub os: String,
pub os_version: String,
pub runtimes: Vec<(String, String)>,
pub frameworks: Vec<(String, String)>,
pub cuda_version: Option<String>,
pub drivers: Vec<(String, String)>,
}
impl SoftwareCapabilities {
pub fn new() -> Self {
Self::default()
}
pub fn with_os(mut self, os: impl Into<String>, version: impl Into<String>) -> Self {
self.os = os.into();
self.os_version = version.into();
self
}
pub fn add_runtime(mut self, name: impl Into<String>, version: impl Into<String>) -> Self {
self.runtimes.push((name.into(), version.into()));
self
}
pub fn add_framework(mut self, name: impl Into<String>, version: impl Into<String>) -> Self {
self.frameworks.push((name.into(), version.into()));
self
}
pub fn with_cuda(mut self, version: impl Into<String>) -> Self {
self.cuda_version = Some(version.into());
self
}
pub fn has_runtime(&self, name: &str) -> bool {
self.runtimes.iter().any(|(n, _)| n == name)
}
pub fn has_framework(&self, name: &str) -> bool {
self.frameworks.iter().any(|(n, _)| n == name)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[repr(u8)]
pub enum Modality {
Text = 0,
Image = 1,
Audio = 2,
Video = 3,
Code = 4,
Embedding = 5,
ToolUse = 6,
}
impl From<u8> for Modality {
fn from(v: u8) -> Self {
match v {
0 => Self::Text,
1 => Self::Image,
2 => Self::Audio,
3 => Self::Video,
4 => Self::Code,
5 => Self::Embedding,
6 => Self::ToolUse,
_ => Self::Text,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ModelCapability {
pub model_id: String,
pub family: String,
pub parameters_b_x10: u32,
pub context_length: u32,
pub quantization: Option<String>,
pub modalities: Vec<Modality>,
pub tokens_per_sec: u32,
pub loaded: bool,
}
impl ModelCapability {
pub fn new(model_id: impl Into<String>, family: impl Into<String>) -> Self {
Self {
model_id: model_id.into(),
family: family.into(),
parameters_b_x10: 0,
context_length: 0,
quantization: None,
modalities: vec![Modality::Text],
tokens_per_sec: 0,
loaded: false,
}
}
pub fn with_parameters(mut self, billions: f32) -> Self {
self.parameters_b_x10 = (billions * 10.0) as u32;
self
}
pub fn with_context_length(mut self, length: u32) -> Self {
self.context_length = length;
self
}
pub fn with_quantization(mut self, quant: impl Into<String>) -> Self {
self.quantization = Some(quant.into());
self
}
pub fn add_modality(mut self, modality: Modality) -> Self {
if !self.modalities.contains(&modality) {
self.modalities.push(modality);
}
self
}
pub fn with_tokens_per_sec(mut self, tps: u32) -> Self {
self.tokens_per_sec = tps;
self
}
pub fn with_loaded(mut self, loaded: bool) -> Self {
self.loaded = loaded;
self
}
pub fn parameters(&self) -> f32 {
self.parameters_b_x10 as f32 / 10.0
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ToolCapability {
pub tool_id: String,
pub name: String,
pub version: String,
pub input_schema: Option<String>,
pub output_schema: Option<String>,
pub requires: Vec<String>,
pub estimated_time_ms: u32,
pub stateless: bool,
}
impl ToolCapability {
pub fn input_schema_metadata_key(tool_id: &str) -> String {
format!("tool::{tool_id}::input_schema")
}
pub fn output_schema_metadata_key(tool_id: &str) -> String {
format!("tool::{tool_id}::output_schema")
}
pub fn new(tool_id: impl Into<String>, name: impl Into<String>) -> Self {
Self {
tool_id: tool_id.into(),
name: name.into(),
version: "1.0.0".into(),
input_schema: None,
output_schema: None,
requires: Vec::new(),
estimated_time_ms: 0,
stateless: true,
}
}
pub fn with_version(mut self, version: impl Into<String>) -> Self {
self.version = version.into();
self
}
pub fn with_input_schema(mut self, schema: impl Into<String>) -> Self {
self.input_schema = Some(schema.into());
self
}
pub fn with_output_schema(mut self, schema: impl Into<String>) -> Self {
self.output_schema = Some(schema.into());
self
}
pub fn requires(mut self, dep: impl Into<String>) -> Self {
self.requires.push(dep.into());
self
}
pub fn with_estimated_time(mut self, ms: u32) -> Self {
self.estimated_time_ms = ms;
self
}
pub fn with_stateless(mut self, stateless: bool) -> Self {
self.stateless = stateless;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct ResourceLimits {
pub max_concurrent_requests: u32,
pub max_tokens_per_request: u32,
pub rate_limit_rpm: u32,
pub max_batch_size: u32,
pub max_input_bytes: u32,
pub max_output_bytes: u32,
}
impl ResourceLimits {
pub fn new() -> Self {
Self::default()
}
pub fn with_max_concurrent(mut self, max: u32) -> Self {
self.max_concurrent_requests = max;
self
}
pub fn with_max_tokens(mut self, max: u32) -> Self {
self.max_tokens_per_request = max;
self
}
pub fn with_rate_limit(mut self, rpm: u32) -> Self {
self.rate_limit_rpm = rpm;
self
}
pub fn with_max_batch(mut self, max: u32) -> Self {
self.max_batch_size = max;
self
}
}
pub const TAG_SCOPE_TENANT_PREFIX: &str = "scope:tenant:";
pub const TAG_SCOPE_REGION_PREFIX: &str = "scope:region:";
pub const TAG_SCOPE_SUBNET_LOCAL: &str = "scope:subnet-local";
pub const TAG_SCOPE_GLOBAL: &str = "scope:global";
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum CapabilityScope {
Global,
SubnetLocal,
Tenants(Vec<String>),
Regions(Vec<String>),
TenantsAndRegions {
tenants: Vec<String>,
regions: Vec<String>,
},
}
#[allow(dead_code)]
pub(crate) fn parse_membership_tags(
tags: &HashSet<Tag>,
) -> (Option<super::subnet::SubnetId>, Vec<super::group::GroupId>) {
let mut subnet_candidates: Vec<super::subnet::SubnetId> = Vec::new();
let mut groups: Vec<super::group::GroupId> = Vec::new();
for tag in tags {
let rendered = tag.to_string();
if let Some(s) = super::subnet::SubnetId::from_tag(&rendered) {
if !subnet_candidates.contains(&s) {
subnet_candidates.push(s);
}
continue;
}
if let Some(g) = super::group::GroupId::from_tag(&rendered) {
if !groups.contains(&g) {
groups.push(g);
}
}
}
let subnet = if subnet_candidates.len() == 1 {
Some(subnet_candidates[0])
} else {
None
};
groups.sort_by_key(|g| g.0);
(subnet, groups)
}
#[derive(Debug, Clone)]
pub enum ScopeFilter<'a> {
Any,
GlobalOnly,
SameSubnet,
Tenant(&'a str),
Tenants(&'a [&'a str]),
Region(&'a str),
Regions(&'a [&'a str]),
}
pub(crate) fn matches_scope(
candidate_scope: &CapabilityScope,
filter: &ScopeFilter<'_>,
same_subnet: bool,
) -> bool {
use CapabilityScope as S;
use ScopeFilter as F;
match (filter, candidate_scope) {
(F::SameSubnet, S::SubnetLocal) => same_subnet,
(_, S::SubnetLocal) => false,
(F::Any, _) => true,
(F::GlobalOnly, S::Global) => true,
(F::GlobalOnly, _) => false,
(F::SameSubnet, _) => same_subnet,
(F::Tenant(_), S::Global)
| (F::Tenants(_), S::Global)
| (F::Region(_), S::Global)
| (F::Regions(_), S::Global) => true,
(F::Tenant(t), S::Tenants(ts))
| (F::Tenant(t), S::TenantsAndRegions { tenants: ts, .. }) => ts.iter().any(|x| x == t),
(F::Tenant(_), S::Regions(_)) => false,
(F::Tenants(wanted), S::Tenants(ts))
| (F::Tenants(wanted), S::TenantsAndRegions { tenants: ts, .. }) => {
ts.iter().any(|x| wanted.iter().any(|w| w == x))
}
(F::Tenants(_), S::Regions(_)) => false,
(F::Region(r), S::Regions(rs))
| (F::Region(r), S::TenantsAndRegions { regions: rs, .. }) => rs.iter().any(|x| x == r),
(F::Region(_), S::Tenants(_)) => false,
(F::Regions(wanted), S::Regions(rs))
| (F::Regions(wanted), S::TenantsAndRegions { regions: rs, .. }) => {
rs.iter().any(|x| wanted.iter().any(|w| w == x))
}
(F::Regions(_), S::Tenants(_)) => false,
}
}
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
pub struct CapabilitySet {
#[serde(default, serialize_with = "serialize_tags_sorted")]
pub tags: HashSet<Tag>,
#[serde(default)]
pub metadata: BTreeMap<String, String>,
}
impl CapabilitySet {
pub fn new() -> Self {
Self::default()
}
pub fn with_hardware(mut self, hardware: HardwareCapabilities) -> Self {
self.set_hardware(hardware);
self
}
pub fn with_software(mut self, software: SoftwareCapabilities) -> Self {
self.set_software(software);
self
}
pub fn add_model(mut self, model: ModelCapability) -> Self {
let mut models = self.views().models().clone();
models.push(model);
self.set_models(models);
self
}
pub fn add_tool(mut self, tool: ToolCapability) -> Self {
let mut tools = self.views().tools().clone();
tools.push(tool);
self.set_tools(tools);
self
}
pub fn add_tag(mut self, tag: impl Into<String>) -> Self {
let s: String = tag.into();
if let Ok(t) = Tag::parse_user(&s) {
self.tags.insert(t);
}
self
}
#[cfg(feature = "dataforts")]
pub fn with_blob_capability(self, blob: super::dataforts_capabilities::BlobCapability) -> Self {
blob.write_into(self)
}
#[cfg(feature = "dataforts")]
pub fn with_greedy_capability(
self,
greedy: super::dataforts_capabilities::GreedyCapability,
) -> Self {
greedy.write_into(self)
}
#[cfg(feature = "dataforts")]
pub fn with_gravity_capability(
self,
gravity: super::dataforts_capabilities::GravityCapability,
) -> Self {
gravity.write_into(self)
}
pub fn with_tenant_scope(mut self, tenant_id: impl Into<String>) -> Self {
let id = tenant_id.into();
if id.is_empty() {
return self;
}
let tag = format!("{TAG_SCOPE_TENANT_PREFIX}{id}");
if let Ok(t) = Tag::parse(&tag) {
self.tags.insert(t);
}
self
}
pub fn with_region_scope(mut self, region: impl Into<String>) -> Self {
let name = region.into();
if name.is_empty() {
return self;
}
let tag = format!("{TAG_SCOPE_REGION_PREFIX}{name}");
if let Ok(t) = Tag::parse(&tag) {
self.tags.insert(t);
}
self
}
pub fn with_subnet_local_scope(mut self) -> Self {
if let Ok(t) = Tag::parse(TAG_SCOPE_SUBNET_LOCAL) {
self.tags.insert(t);
}
self
}
pub fn require_chain(mut self, chain_hash: impl AsRef<str>) -> Self {
let hash = chain_hash.as_ref();
if hash.is_empty() {
return self;
}
if let Ok(t) = Tag::parse(&format!("causal:{hash}")) {
self.tags.insert(t);
}
self
}
pub fn require_chain_tip(mut self, chain_hash: impl AsRef<str>, tip_seq: u64) -> Self {
let hash = chain_hash.as_ref();
if hash.is_empty() {
return self;
}
if let Ok(t) = Tag::parse(&format!("causal:{hash}:{tip_seq}")) {
self.tags.insert(t);
}
self
}
pub fn require_chain_range(
mut self,
chain_hash: impl AsRef<str>,
start_seq: u64,
end_seq: u64,
) -> Self {
let hash = chain_hash.as_ref();
if hash.is_empty() || start_seq >= end_seq {
return self;
}
if let Ok(t) = Tag::parse(&format!("causal:{hash}[{start_seq}..{end_seq}]")) {
self.tags.insert(t);
}
self
}
pub fn require_any_chain<I, S>(mut self, chain_hashes: I) -> Self
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
for hash in chain_hashes {
self = self.require_chain(hash);
}
self
}
pub fn from_fork(mut self, parent_chain_hash: impl AsRef<str>) -> Self {
let hash = parent_chain_hash.as_ref();
if hash.is_empty() {
return self;
}
if let Ok(t) = Tag::parse(&format!("fork-of:{hash}")) {
self.tags.insert(t);
}
self
}
pub fn heat_level(mut self, chain_hash: impl AsRef<str>, rate: f64) -> Self {
let hash = chain_hash.as_ref();
if hash.is_empty() {
return self;
}
let clamped = if rate.is_finite() {
rate.clamp(0.0, 1.0)
} else {
return self;
};
if let Ok(t) = Tag::parse(&format!("heat:{hash}={clamped:.2}")) {
self.tags.insert(t);
}
self
}
pub fn with_limits(mut self, limits: ResourceLimits) -> Self {
self.set_limits(limits);
self
}
pub fn with_metadata(self, key: impl Into<String>, value: impl Into<String>) -> Self {
let key: String = key.into();
if super::schema::AXIS_SCHEMA
.metadata_reserved_prefixes
.iter()
.any(|p| key.starts_with(*p))
{
return self;
}
self.with_metadata_unchecked(key, value)
}
pub(crate) fn with_metadata_unchecked(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
pub fn set_hardware(&mut self, hardware: HardwareCapabilities) {
self.tags
.retain(|t| !crate::adapter::net::behavior::tag_codec::is_hardware_owned_tag(t));
self.tags
.extend(crate::adapter::net::behavior::tag_codec::hardware_to_tags(
&hardware,
));
}
pub fn set_software(&mut self, software: SoftwareCapabilities) {
self.tags
.retain(|t| !crate::adapter::net::behavior::tag_codec::is_software_owned_tag(t));
self.tags
.extend(crate::adapter::net::behavior::tag_codec::software_to_tags(
&software,
));
}
pub fn set_limits(&mut self, limits: ResourceLimits) {
self.tags
.retain(|t| !crate::adapter::net::behavior::tag_codec::is_resource_limits_owned_tag(t));
self.tags
.extend(crate::adapter::net::behavior::tag_codec::resource_limits_to_tags(&limits));
}
pub fn set_models(&mut self, models: Vec<ModelCapability>) {
self.tags
.retain(|t| !crate::adapter::net::behavior::tag_codec::is_models_owned_tag(t));
self.tags
.extend(crate::adapter::net::behavior::tag_codec::models_to_tags(
&models,
));
}
pub fn set_tools(&mut self, tools: Vec<ToolCapability>) {
self.tags
.retain(|t| !crate::adapter::net::behavior::tag_codec::is_tools_owned_tag(t));
let new_ids: HashSet<&str> = tools.iter().map(|t| t.tool_id.as_str()).collect();
self.metadata.retain(|key, _| {
let Some(rest) = key.strip_prefix("tool::") else {
return true;
};
let Some((id, _suffix)) = rest.split_once("::") else {
return true;
};
new_ids.contains(id)
});
self.tags
.extend(crate::adapter::net::behavior::tag_codec::tools_to_tags(
&tools,
));
for tool in &tools {
if let Some(schema) = &tool.input_schema {
self.metadata.insert(
ToolCapability::input_schema_metadata_key(&tool.tool_id),
schema.clone(),
);
}
if let Some(schema) = &tool.output_schema {
self.metadata.insert(
ToolCapability::output_schema_metadata_key(&tool.tool_id),
schema.clone(),
);
}
}
}
pub fn has_tag(&self, tag: &str) -> bool {
let Ok(parsed) = Tag::parse(tag) else {
return false;
};
self.tags.iter().any(|t| t.semantic_eq(&parsed))
}
pub fn has_model(&self, model_id: &str) -> bool {
use crate::adapter::net::behavior::tag::TaxonomyAxis;
self.tags.iter().any(|tag| {
let Some(key) = tag.axis_key() else {
return false;
};
if key.axis != TaxonomyAxis::Software {
return false;
}
let Some(rest) = key.key.strip_prefix("model.") else {
return false;
};
let Some((_idx, sub)) = rest.split_once('.') else {
return false;
};
sub == "id" && tag.value() == Some(model_id)
})
}
pub fn has_tool(&self, tool_id: &str) -> bool {
use crate::adapter::net::behavior::tag::TaxonomyAxis;
self.tags.iter().any(|tag| {
let Some(key) = tag.axis_key() else {
return false;
};
if key.axis != TaxonomyAxis::Software {
return false;
}
let Some(rest) = key.key.strip_prefix("tool.") else {
return false;
};
let Some((_idx, sub)) = rest.split_once('.') else {
return false;
};
sub == "tool_id" && tag.value() == Some(tool_id)
})
}
pub fn has_gpu(&self) -> bool {
use crate::adapter::net::behavior::tag::TaxonomyAxis;
self.tags.contains(&Tag::AxisPresent {
axis: TaxonomyAxis::Hardware,
key: "gpu".into(),
})
}
pub fn model_ids(&self) -> Vec<String> {
self.views()
.models()
.iter()
.map(|m| m.model_id.clone())
.collect()
}
pub fn tool_ids(&self) -> Vec<String> {
self.views()
.tools()
.iter()
.map(|t| t.tool_id.clone())
.collect()
}
pub fn to_bytes(&self) -> Vec<u8> {
serde_json::to_vec(self).unwrap_or_default()
}
pub fn from_bytes(data: &[u8]) -> Option<Self> {
serde_json::from_slice(data).ok()
}
pub fn diff(&self, prev: &CapabilitySet) -> CapabilitySetDiff {
let added_tags: HashSet<Tag> = self
.tags
.iter()
.filter(|t| !prev.tags.iter().any(|p| p.semantic_eq(t)))
.cloned()
.collect();
let removed_tags: HashSet<Tag> = prev
.tags
.iter()
.filter(|t| !self.tags.iter().any(|c| c.semantic_eq(t)))
.cloned()
.collect();
let mut changed_metadata = Vec::new();
let mut prev_iter = prev.metadata.iter().peekable();
let mut curr_iter = self.metadata.iter().peekable();
loop {
match (prev_iter.peek(), curr_iter.peek()) {
(Some((pk, pv)), Some((ck, cv))) => match pk.cmp(ck) {
std::cmp::Ordering::Less => {
changed_metadata.push(MetadataChange::Removed {
key: (*pk).clone(),
prev_value: (*pv).clone(),
});
prev_iter.next();
}
std::cmp::Ordering::Greater => {
changed_metadata.push(MetadataChange::Added {
key: (*ck).clone(),
value: (*cv).clone(),
});
curr_iter.next();
}
std::cmp::Ordering::Equal => {
if pv != cv {
changed_metadata.push(MetadataChange::Updated {
key: (*pk).clone(),
prev_value: (*pv).clone(),
new_value: (*cv).clone(),
});
}
prev_iter.next();
curr_iter.next();
}
},
(Some((pk, pv)), None) => {
changed_metadata.push(MetadataChange::Removed {
key: (*pk).clone(),
prev_value: (*pv).clone(),
});
prev_iter.next();
}
(None, Some((ck, cv))) => {
changed_metadata.push(MetadataChange::Added {
key: (*ck).clone(),
value: (*cv).clone(),
});
curr_iter.next();
}
(None, None) => break,
}
}
CapabilitySetDiff {
added_tags,
removed_tags,
changed_metadata,
}
}
pub fn views(&self) -> CapabilityViews<'_> {
CapabilityViews {
caps: self,
sorted_tags: OnceCell::new(),
hardware: OnceCell::new(),
software: OnceCell::new(),
resource_limits: OnceCell::new(),
models: OnceCell::new(),
tools: OnceCell::new(),
}
}
pub fn typed_tags(&self) -> std::collections::HashSet<crate::adapter::net::behavior::tag::Tag> {
crate::adapter::net::behavior::tag_codec::capability_set_to_tag_set(self)
}
pub fn from_typed_tags(
tags: &std::collections::HashSet<crate::adapter::net::behavior::tag::Tag>,
) -> Self {
crate::adapter::net::behavior::tag_codec::capability_set_from_tag_set(tags)
}
}
#[derive(Debug)]
pub struct CapabilityViews<'a> {
caps: &'a CapabilitySet,
sorted_tags: OnceCell<Vec<Tag>>,
hardware: OnceCell<HardwareCapabilities>,
software: OnceCell<SoftwareCapabilities>,
resource_limits: OnceCell<ResourceLimits>,
models: OnceCell<Vec<ModelCapability>>,
tools: OnceCell<Vec<ToolCapability>>,
}
impl<'a> CapabilityViews<'a> {
fn sorted_tags(&self) -> &Vec<Tag> {
self.sorted_tags
.get_or_init(|| sorted_tag_vec(&self.caps.tags))
}
pub fn hardware(&self) -> &HardwareCapabilities {
self.hardware.get_or_init(|| {
crate::adapter::net::behavior::tag_codec::hardware_from_tags(self.sorted_tags())
})
}
pub fn software(&self) -> &SoftwareCapabilities {
self.software.get_or_init(|| {
crate::adapter::net::behavior::tag_codec::software_from_tags(self.sorted_tags())
})
}
pub fn resource_limits(&self) -> &ResourceLimits {
self.resource_limits.get_or_init(|| {
crate::adapter::net::behavior::tag_codec::resource_limits_from_tags(self.sorted_tags())
})
}
pub fn models(&self) -> &Vec<ModelCapability> {
self.models.get_or_init(|| {
crate::adapter::net::behavior::tag_codec::models_from_tags(self.sorted_tags())
})
}
pub fn tools(&self) -> &Vec<ToolCapability> {
self.tools.get_or_init(|| {
let mut tools =
crate::adapter::net::behavior::tag_codec::tools_from_tags(self.sorted_tags());
for tool in &mut tools {
if let Some(s) = self
.caps
.metadata
.get(&ToolCapability::input_schema_metadata_key(&tool.tool_id))
{
tool.input_schema = Some(s.clone());
}
if let Some(s) = self
.caps
.metadata
.get(&ToolCapability::output_schema_metadata_key(&tool.tool_id))
{
tool.output_schema = Some(s.clone());
}
}
tools
})
}
}
fn sorted_tag_vec(tags: &HashSet<Tag>) -> Vec<Tag> {
let mut v: Vec<Tag> = tags.iter().cloned().collect();
v.sort_by_key(|a| a.to_string());
v
}
fn serialize_tags_sorted<S: serde::Serializer>(
tags: &HashSet<Tag>,
serializer: S,
) -> Result<S::Ok, S::Error> {
use serde::ser::SerializeSeq;
let sorted = sorted_tag_vec(tags);
let mut seq = serializer.serialize_seq(Some(sorted.len()))?;
for t in &sorted {
seq.serialize_element(t)?;
}
seq.end()
}
impl From<&CapabilitySet> for HardwareCapabilities {
fn from(caps: &CapabilitySet) -> Self {
crate::adapter::net::behavior::tag_codec::hardware_from_tags(&sorted_tag_vec(&caps.tags))
}
}
impl From<&CapabilitySet> for SoftwareCapabilities {
fn from(caps: &CapabilitySet) -> Self {
crate::adapter::net::behavior::tag_codec::software_from_tags(&sorted_tag_vec(&caps.tags))
}
}
impl From<&CapabilitySet> for ResourceLimits {
fn from(caps: &CapabilitySet) -> Self {
crate::adapter::net::behavior::tag_codec::resource_limits_from_tags(&sorted_tag_vec(
&caps.tags,
))
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct CapabilitySetDiff {
pub added_tags: HashSet<Tag>,
pub removed_tags: HashSet<Tag>,
pub changed_metadata: Vec<MetadataChange>,
}
impl CapabilitySetDiff {
pub fn is_empty(&self) -> bool {
self.added_tags.is_empty()
&& self.removed_tags.is_empty()
&& self.changed_metadata.is_empty()
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum MetadataChange {
Added {
key: String,
value: String,
},
Removed {
key: String,
prev_value: String,
},
Updated {
key: String,
prev_value: String,
new_value: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CapabilityAnnouncement {
pub node_id: u64,
pub entity_id: super::super::identity::EntityId,
pub version: u64,
pub timestamp_ns: u64,
pub ttl_secs: u32,
pub capabilities: CapabilitySet,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub signature: Option<Signature64>,
#[serde(default, skip_serializing_if = "is_hop_count_zero")]
pub hop_count: u8,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reflex_addr: Option<std::net::SocketAddr>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub allowed_nodes: Vec<u64>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub allowed_subnets: Vec<super::subnet::SubnetId>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub allowed_groups: Vec<super::group::GroupId>,
}
pub const MAX_ALLOW_LIST_LEN: usize = 64;
fn is_hop_count_zero(v: &u8) -> bool {
*v == 0
}
pub const MAX_CAPABILITY_HOPS: u8 = 16;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Signature64(pub [u8; 64]);
impl Serialize for Signature64 {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
let hex = hex::encode(self.0);
serializer.serialize_str(&hex)
} else {
serializer.serialize_bytes(&self.0)
}
}
}
impl<'de> Deserialize<'de> for Signature64 {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
if deserializer.is_human_readable() {
let hex_str = String::deserialize(deserializer)?;
let bytes = hex::decode(&hex_str).map_err(serde::de::Error::custom)?;
if bytes.len() != 64 {
return Err(serde::de::Error::custom("signature must be 64 bytes"));
}
let mut arr = [0u8; 64];
arr.copy_from_slice(&bytes);
Ok(Signature64(arr))
} else {
let bytes = <Vec<u8>>::deserialize(deserializer)?;
if bytes.len() != 64 {
return Err(serde::de::Error::custom("signature must be 64 bytes"));
}
let mut arr = [0u8; 64];
arr.copy_from_slice(&bytes);
Ok(Signature64(arr))
}
}
}
impl CapabilityAnnouncement {
pub const DEFAULT_TTL_SECS: u32 = 300;
pub fn new(
node_id: u64,
entity_id: super::super::identity::EntityId,
version: u64,
capabilities: CapabilitySet,
) -> Self {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
Self {
node_id,
entity_id,
version,
timestamp_ns,
ttl_secs: Self::DEFAULT_TTL_SECS,
capabilities,
signature: None,
hop_count: 0,
reflex_addr: None,
allowed_nodes: Vec::new(),
allowed_subnets: Vec::new(),
allowed_groups: Vec::new(),
}
}
pub fn with_ttl(mut self, ttl_secs: u32) -> Self {
self.ttl_secs = ttl_secs;
self
}
pub fn with_reflex_addr(mut self, reflex: Option<std::net::SocketAddr>) -> Self {
self.reflex_addr = reflex;
self
}
pub fn with_signature(mut self, sig: [u8; 64]) -> Self {
self.signature = Some(Signature64(sig));
self
}
#[expect(
clippy::expect_used,
reason = "no CapabilityAnnouncement field has a fallible Serialize impl today; panic is the documented loud-diagnostic strategy for a future refactor that introduces one"
)]
fn signed_payload(&self) -> Vec<u8> {
let mut canonical = self.clone();
canonical.signature = None;
canonical.hop_count = 0;
serde_json::to_vec(&canonical).expect(
"CapabilityAnnouncement::signed_payload: serde_json::to_vec is infallible \
over the current field set; if this ever fires, a fallible Serialize impl \
was added and the signed transcript must be re-designed before merging",
)
}
pub fn sign(&mut self, keypair: &super::super::identity::EntityKeypair) {
let payload = self.signed_payload();
let sig = keypair.sign(&payload);
self.signature = Some(Signature64(sig.to_bytes()));
}
pub fn verify(&self) -> Result<(), super::super::identity::EntityError> {
let Some(Signature64(raw)) = self.signature else {
return Err(super::super::identity::EntityError::InvalidSignature);
};
let payload = self.signed_payload();
let sig = ed25519_dalek::Signature::from_bytes(&raw);
self.entity_id.verify(&payload, &sig)
}
pub fn to_bytes(&self) -> Vec<u8> {
serde_json::to_vec(self).unwrap_or_default()
}
pub fn from_bytes(data: &[u8]) -> Option<Self> {
let ann: Self = serde_json::from_slice(data).ok()?;
if ann.allowed_nodes.len() > MAX_ALLOW_LIST_LEN
|| ann.allowed_subnets.len() > MAX_ALLOW_LIST_LEN
|| ann.allowed_groups.len() > MAX_ALLOW_LIST_LEN
{
return None;
}
Some(ann)
}
pub fn strip_reserved_metadata(&mut self) {
use super::schema::AXIS_SCHEMA;
self.capabilities.metadata.retain(|key, _| {
if AXIS_SCHEMA.metadata_reserved.contains(&key.as_str()) {
return false;
}
!AXIS_SCHEMA
.metadata_reserved_prefixes
.iter()
.any(|prefix| key.starts_with(prefix))
});
}
pub fn is_expired(&self) -> bool {
use std::time::{SystemTime, UNIX_EPOCH};
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let age_secs = (now_ns.saturating_sub(self.timestamp_ns)) / 1_000_000_000;
age_secs >= self.ttl_secs as u64
}
}
#[derive(Debug, Clone, Default)]
pub struct CapabilityFilter {
pub require_tags: Vec<String>,
pub require_models: Vec<String>,
pub require_tools: Vec<String>,
pub min_memory_gb: Option<u32>,
pub require_gpu: bool,
pub gpu_vendor: Option<GpuVendor>,
pub min_vram_gb: Option<u32>,
pub min_context_length: Option<u32>,
pub require_modalities: Vec<Modality>,
}
impl CapabilityFilter {
pub fn new() -> Self {
Self::default()
}
pub fn require_tag(mut self, tag: impl Into<String>) -> Self {
self.require_tags.push(tag.into());
self
}
pub fn require_model(mut self, model: impl Into<String>) -> Self {
self.require_models.push(model.into());
self
}
pub fn require_tool(mut self, tool: impl Into<String>) -> Self {
self.require_tools.push(tool.into());
self
}
pub fn with_min_memory(mut self, gb: u32) -> Self {
self.min_memory_gb = Some(gb);
self
}
pub fn require_gpu(mut self) -> Self {
self.require_gpu = true;
self
}
pub fn with_gpu_vendor(mut self, vendor: GpuVendor) -> Self {
self.gpu_vendor = Some(vendor);
self.require_gpu = true;
self
}
pub fn with_min_vram(mut self, gb: u32) -> Self {
self.min_vram_gb = Some(gb);
self.require_gpu = true;
self
}
pub fn with_min_context(mut self, length: u32) -> Self {
self.min_context_length = Some(length);
self
}
pub fn require_modality(mut self, modality: Modality) -> Self {
self.require_modalities.push(modality);
self
}
pub fn matches(&self, caps: &CapabilitySet) -> bool {
for tag in &self.require_tags {
if !caps.has_tag(tag) {
return false;
}
}
if !self.require_models.is_empty() {
let has_model = self.require_models.iter().any(|m| caps.has_model(m));
if !has_model {
return false;
}
}
if !self.require_tools.is_empty() {
let has_tool = self.require_tools.iter().any(|t| caps.has_tool(t));
if !has_tool {
return false;
}
}
let views = caps.views();
if let Some(min_mem) = self.min_memory_gb {
if views.hardware().memory_gb < min_mem {
return false;
}
}
if self.require_gpu && !caps.has_gpu() {
return false;
}
if let Some(vendor) = self.gpu_vendor {
if views.hardware().gpu_vendor() != Some(vendor) {
return false;
}
}
if let Some(min_vram) = self.min_vram_gb {
if views.hardware().total_vram_gb() < min_vram {
return false;
}
}
if let Some(min_ctx) = self.min_context_length {
let has_sufficient = views.models().iter().any(|m| m.context_length >= min_ctx);
if !has_sufficient {
return false;
}
}
for modality in &self.require_modalities {
let has_modality = views
.models()
.iter()
.any(|m| m.modalities.contains(modality));
if !has_modality {
return false;
}
}
true
}
}
#[derive(Debug, Clone, Default)]
pub struct CapabilityRequirement {
pub filter: CapabilityFilter,
pub prefer_more_memory: f32,
pub prefer_more_vram: f32,
pub prefer_faster_inference: f32,
pub prefer_loaded_models: f32,
}
impl CapabilityRequirement {
pub fn from_filter(filter: CapabilityFilter) -> Self {
Self {
filter,
..Default::default()
}
}
pub fn prefer_memory(mut self, weight: f32) -> Self {
self.prefer_more_memory = weight.clamp(0.0, 1.0);
self
}
pub fn prefer_vram(mut self, weight: f32) -> Self {
self.prefer_more_vram = weight.clamp(0.0, 1.0);
self
}
pub fn prefer_speed(mut self, weight: f32) -> Self {
self.prefer_faster_inference = weight.clamp(0.0, 1.0);
self
}
pub fn prefer_loaded(mut self, weight: f32) -> Self {
self.prefer_loaded_models = weight.clamp(0.0, 1.0);
self
}
pub fn score(&self, caps: &CapabilitySet) -> f32 {
if !self.filter.matches(caps) {
return 0.0;
}
let views = caps.views();
let mut score = 1.0;
if self.prefer_more_memory > 0.0 {
let mem_score = (views.hardware().memory_gb as f32 / 256.0).min(1.0);
score += self.prefer_more_memory * mem_score;
}
if self.prefer_more_vram > 0.0 {
let vram_score = (views.hardware().total_vram_gb() as f32 / 80.0).min(1.0);
score += self.prefer_more_vram * vram_score;
}
if self.prefer_faster_inference > 0.0 {
let max_tps: u32 = views
.models()
.iter()
.map(|m| m.tokens_per_sec)
.max()
.unwrap_or(0);
let speed_score = (max_tps as f32 / 1000.0).min(1.0);
score += self.prefer_faster_inference * speed_score;
}
if self.prefer_loaded_models > 0.0 {
let models = views.models();
let loaded_count = models.iter().filter(|m| m.loaded).count();
let loaded_ratio = if models.is_empty() {
0.0
} else {
loaded_count as f32 / models.len() as f32
};
score += self.prefer_loaded_models * loaded_ratio;
}
score
}
}
pub trait CardinalityProvider {
fn axis_cardinality(&self, key: &crate::adapter::net::behavior::tag::TagKey) -> usize;
fn metadata_value_cardinality(&self, key: &str) -> usize;
}
#[cfg(test)]
mod tests {
use super::*;
fn test_entity() -> super::super::super::identity::EntityId {
super::super::super::identity::EntityId::from_bytes([0u8; 32])
}
#[test]
fn strip_reserved_metadata_drops_reserved_keys() {
let mut ann = CapabilityAnnouncement::new(0xDEAD, test_entity(), 7, CapabilitySet::new());
ann.capabilities
.metadata
.insert("intent".into(), "evil-tenant".into());
ann.capabilities
.metadata
.insert("colocate-with".into(), "0xdeadbeef".into());
ann.capabilities
.metadata
.insert("priority".into(), "9999".into());
ann.capabilities
.metadata
.insert("owner".into(), "attacker".into());
ann.capabilities
.metadata
.insert("tool::pwn".into(), "go-brrr".into());
ann.capabilities
.metadata
.insert("app::region".into(), "us-east".into());
ann.capabilities
.metadata
.insert("user_tag".into(), "fine".into());
ann.strip_reserved_metadata();
assert!(!ann.capabilities.metadata.contains_key("intent"));
assert!(!ann.capabilities.metadata.contains_key("colocate-with"));
assert!(!ann.capabilities.metadata.contains_key("priority"));
assert!(!ann.capabilities.metadata.contains_key("owner"));
assert!(!ann.capabilities.metadata.contains_key("tool::pwn"));
assert_eq!(
ann.capabilities
.metadata
.get("app::region")
.map(String::as_str),
Some("us-east"),
);
assert_eq!(
ann.capabilities
.metadata
.get("user_tag")
.map(String::as_str),
Some("fine"),
);
}
#[test]
fn strip_reserved_metadata_invalidates_signature() {
use super::super::super::identity::EntityKeypair;
let keypair = EntityKeypair::generate();
let mut ann =
CapabilityAnnouncement::new(1, keypair.entity_id().clone(), 1, sample_capability_set());
ann.capabilities
.metadata
.insert("intent".into(), "compute".into());
ann.sign(&keypair);
assert!(ann.verify().is_ok());
let forward_bytes = ann.to_bytes();
let forwarded =
CapabilityAnnouncement::from_bytes(&forward_bytes).expect("forwarded parses");
assert!(
forwarded.verify().is_ok(),
"downstream verifier must accept the un-stripped wire bytes"
);
ann.strip_reserved_metadata();
assert!(
ann.verify().is_err(),
"strip must invalidate the signature so the substrate is forced \
to strip the local copy AFTER any re-broadcast"
);
}
fn sample_capability_set() -> CapabilitySet {
let gpu = GpuInfo::new(GpuVendor::Nvidia, "RTX 4090", 24)
.with_compute_units(128)
.with_tensor_cores(512)
.with_fp16_tflops(82.5);
let hardware = HardwareCapabilities::new()
.with_cpu(16, 32)
.with_memory(64)
.with_gpu(gpu)
.with_storage(2000)
.with_network(10);
let software = SoftwareCapabilities::new()
.with_os("linux", "6.1")
.add_runtime("python", "3.11")
.add_framework("pytorch", "2.1")
.with_cuda("12.1");
let model = ModelCapability::new("llama-3.1-70b", "llama")
.with_parameters(70.0)
.with_context_length(128000)
.with_quantization("fp16")
.add_modality(Modality::Text)
.add_modality(Modality::Code)
.with_tokens_per_sec(50)
.with_loaded(true);
let tool = ToolCapability::new("python_repl", "Python REPL")
.with_version("1.0.0")
.with_estimated_time(100);
CapabilitySet::new()
.with_hardware(hardware)
.with_software(software)
.add_model(model)
.add_tool(tool)
.add_tag("inference")
.add_tag("gpu")
.with_limits(ResourceLimits::new().with_max_concurrent(10))
}
#[test]
fn test_capability_set_creation() {
let caps = sample_capability_set();
assert!(caps.has_gpu());
assert!(caps.has_tag("inference"));
assert!(caps.has_model("llama-3.1-70b"));
assert!(caps.has_tool("python_repl"));
assert_eq!(caps.views().hardware().memory_gb, 64);
}
#[test]
fn test_capability_set_serialization() {
let caps = sample_capability_set();
let bytes = caps.to_bytes();
let parsed = CapabilitySet::from_bytes(&bytes).unwrap();
assert_eq!(
caps.views().hardware().memory_gb,
parsed.views().hardware().memory_gb,
);
assert_eq!(caps.tags, parsed.tags);
assert_eq!(caps.views().models().len(), parsed.views().models().len());
}
#[test]
fn with_metadata_drops_reserved_prefix_keys() {
let caps = CapabilitySet::new()
.with_metadata("tool::evil::input_schema", "spoof")
.with_metadata("region", "us-east");
assert!(
!caps.metadata.contains_key("tool::evil::input_schema"),
"with_metadata must drop reserved-prefix keys: {:?}",
caps.metadata
);
assert_eq!(
caps.metadata.get("region").map(|s| s.as_str()),
Some("us-east")
);
let caps = CapabilitySet::new().with_metadata("intent", "ml-training");
assert_eq!(
caps.metadata.get("intent").map(|s| s.as_str()),
Some("ml-training")
);
}
#[test]
fn has_tag_matches_across_separator_forms() {
use crate::adapter::net::behavior::tag::{AxisSeparator, Tag, TaxonomyAxis};
let mut caps = CapabilitySet::new();
caps.tags.insert(Tag::AxisValue {
axis: TaxonomyAxis::Software,
key: "os".to_string(),
value: "linux".to_string(),
separator: AxisSeparator::Colon,
});
assert!(caps.has_tag("software.os=linux"));
assert!(caps.has_tag("software.os:linux"));
assert!(!caps.has_tag("software.os=darwin"));
let mut caps = CapabilitySet::new();
caps.tags.insert(Tag::AxisValue {
axis: TaxonomyAxis::Hardware,
key: "gpu.vram_gb".to_string(),
value: "80".to_string(),
separator: AxisSeparator::Eq,
});
assert!(caps.has_tag("hardware.gpu.vram_gb:80"));
assert!(caps.has_tag("hardware.gpu.vram_gb=80"));
}
#[test]
fn test_capability_filter_matches() {
let caps = sample_capability_set();
let filter = CapabilityFilter::new().require_tag("inference");
assert!(filter.matches(&caps));
let filter = CapabilityFilter::new().require_tag("training");
assert!(!filter.matches(&caps));
let filter = CapabilityFilter::new().require_gpu();
assert!(filter.matches(&caps));
let filter = CapabilityFilter::new().with_gpu_vendor(GpuVendor::Nvidia);
assert!(filter.matches(&caps));
let filter = CapabilityFilter::new().with_gpu_vendor(GpuVendor::Amd);
assert!(!filter.matches(&caps));
let filter = CapabilityFilter::new().with_min_memory(32);
assert!(filter.matches(&caps));
let filter = CapabilityFilter::new().with_min_memory(128);
assert!(!filter.matches(&caps));
let filter = CapabilityFilter::new().require_model("llama-3.1-70b");
assert!(filter.matches(&caps));
let filter = CapabilityFilter::new().require_model("gpt-4");
assert!(!filter.matches(&caps));
}
#[test]
fn test_capability_requirement_scoring() {
let caps = sample_capability_set();
let req = CapabilityRequirement::from_filter(CapabilityFilter::new().require_gpu())
.prefer_memory(0.5)
.prefer_vram(0.5)
.prefer_speed(0.5);
let score = req.score(&caps);
assert!(score > 1.0); }
#[test]
fn test_capability_announcement_expiry() {
let caps = sample_capability_set();
let mut ann = CapabilityAnnouncement::new(1, test_entity(), 1, caps);
assert!(!ann.is_expired());
ann.timestamp_ns = 0;
ann.ttl_secs = 1;
assert!(ann.is_expired());
}
#[test]
fn announcement_is_expired_table_driven_across_ttl_buckets() {
use std::time::{SystemTime, UNIX_EPOCH};
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let sec_ns = 1_000_000_000u64;
let cases: &[(u32, u64, bool, &str)] = &[
(0, 0, true, "ttl=0 fresh"),
(1, 0, false, "ttl=1s fresh"),
(1, 2, true, "ttl=1s aged 2s"),
(3_600, 1, false, "ttl=1h aged 1s"),
(3_600, 3_599, false, "ttl=1h aged 3599s"),
(3_600, 3_600, true, "ttl=1h aged exactly 3600s (inclusive)"),
(3_600, 3_601, true, "ttl=1h aged 3601s"),
(31_536_000, 86_400, false, "ttl=1yr aged 1 day"),
(31_536_000, 31_536_001, true, "ttl=1yr aged just past"),
(u32::MAX, 31_536_000, false, "ttl=u32::MAX aged 1 year"),
];
for &(ttl_secs, age_secs, expected, label) in cases {
let mut ann = CapabilityAnnouncement::new(1, test_entity(), 1, sample_capability_set());
ann.ttl_secs = ttl_secs;
ann.timestamp_ns = now_ns.saturating_sub(age_secs.saturating_mul(sec_ns));
assert_eq!(
ann.is_expired(),
expected,
"is_expired({label}) must be {expected}",
);
}
}
#[test]
fn hop_count_defaults_to_zero() {
let ann = CapabilityAnnouncement::new(1, test_entity(), 1, sample_capability_set());
assert_eq!(ann.hop_count, 0);
}
#[test]
fn hop_count_roundtrips_through_serde() {
let mut ann = CapabilityAnnouncement::new(1, test_entity(), 1, sample_capability_set());
ann.hop_count = 7;
let bytes = ann.to_bytes();
let restored = CapabilityAnnouncement::from_bytes(&bytes).expect("parse");
assert_eq!(restored.hop_count, 7);
}
#[test]
fn old_format_without_hop_count_parses_as_zero() {
let payload = serde_json::json!({
"node_id": 1,
"entity_id": hex::encode([0u8; 32]),
"version": 1,
"timestamp_ns": 0u64,
"ttl_secs": 300u32,
"capabilities": sample_capability_set(),
});
let bytes = serde_json::to_vec(&payload).expect("serialize");
let parsed = CapabilityAnnouncement::from_bytes(&bytes).expect("parse old format");
assert_eq!(parsed.hop_count, 0);
}
#[test]
fn signature_verifies_across_hop_count_bumps() {
use super::super::super::identity::EntityKeypair;
let keypair = EntityKeypair::generate();
let mut ann =
CapabilityAnnouncement::new(1, keypair.entity_id().clone(), 1, sample_capability_set());
ann.sign(&keypair);
assert!(ann.verify().is_ok());
for bumped in 1..=MAX_CAPABILITY_HOPS {
ann.hop_count = bumped;
assert!(
ann.verify().is_ok(),
"signature should remain valid after hop_count={}",
bumped
);
}
}
#[test]
fn signature_rejects_tampered_payload_even_at_hop_zero() {
use super::super::super::identity::EntityKeypair;
let keypair = EntityKeypair::generate();
let mut ann =
CapabilityAnnouncement::new(1, keypair.entity_id().clone(), 1, sample_capability_set());
ann.sign(&keypair);
ann.node_id ^= 0x01;
assert!(ann.verify().is_err());
}
#[test]
fn max_capability_hops_matches_pingwave_contract() {
assert_eq!(MAX_CAPABILITY_HOPS, 16);
}
#[test]
fn empty_allow_lists_omit_fields_from_wire() {
let ann = CapabilityAnnouncement::new(
42,
super::super::super::identity::EntityId::from_bytes([0xAA; 32]),
1,
sample_capability_set(),
);
let bytes = ann.to_bytes();
let s = std::str::from_utf8(&bytes).unwrap();
assert!(
!s.contains("allowed_nodes"),
"empty allowed_nodes must be skipped on the wire; got: {}",
s
);
assert!(
!s.contains("allowed_subnets"),
"empty allowed_subnets must be skipped on the wire; got: {}",
s
);
assert!(
!s.contains("allowed_groups"),
"empty allowed_groups must be skipped on the wire; got: {}",
s
);
}
#[test]
fn populated_allow_lists_round_trip() {
let mut ann = CapabilityAnnouncement::new(
7,
super::super::super::identity::EntityId::from_bytes([0xBB; 32]),
2,
sample_capability_set(),
);
ann.allowed_nodes = vec![100, 200, 300];
ann.allowed_subnets = vec![super::super::subnet::SubnetId([0x11; 16])];
ann.allowed_groups = vec![
super::super::group::GroupId([0x33; 32]),
super::super::group::GroupId([0x44; 32]),
];
let bytes = ann.to_bytes();
let decoded = CapabilityAnnouncement::from_bytes(&bytes).expect("decode");
assert_eq!(decoded.allowed_nodes, ann.allowed_nodes);
assert_eq!(decoded.allowed_subnets, ann.allowed_subnets);
assert_eq!(decoded.allowed_groups, ann.allowed_groups);
}
#[test]
fn signed_payload_omits_empty_allow_lists() {
use super::super::super::identity::EntityKeypair;
let keypair = EntityKeypair::generate();
let ann =
CapabilityAnnouncement::new(5, keypair.entity_id().clone(), 1, sample_capability_set());
let canonical = ann.signed_payload();
let v: serde_json::Value = serde_json::from_slice(&canonical).expect("parse");
let obj = v.as_object().expect("object");
assert!(
!obj.contains_key("allowed_nodes"),
"pre-v0.4 wire shape must not carry allowed_nodes when empty"
);
assert!(
!obj.contains_key("allowed_subnets"),
"pre-v0.4 wire shape must not carry allowed_subnets when empty"
);
assert!(
!obj.contains_key("allowed_groups"),
"pre-v0.4 wire shape must not carry allowed_groups when empty"
);
}
#[test]
fn signed_announcement_with_allow_lists_verifies_after_round_trip() {
use super::super::super::identity::EntityKeypair;
let keypair = EntityKeypair::generate();
let mut ann =
CapabilityAnnouncement::new(9, keypair.entity_id().clone(), 1, sample_capability_set());
ann.allowed_nodes = vec![1, 2, 3];
ann.allowed_subnets = vec![super::super::subnet::SubnetId([0x55; 16])];
ann.allowed_groups = vec![super::super::group::GroupId([0x66; 32])];
ann.sign(&keypair);
let bytes = ann.to_bytes();
let decoded = CapabilityAnnouncement::from_bytes(&bytes).expect("decode");
assert!(
decoded.verify().is_ok(),
"signature must cover the new allow-list fields end-to-end"
);
}
#[test]
fn signed_announcement_rejects_tampered_allow_lists() {
use super::super::super::identity::EntityKeypair;
let keypair = EntityKeypair::generate();
for which in &["nodes", "subnets", "groups"] {
let mut ann = CapabilityAnnouncement::new(
9,
keypair.entity_id().clone(),
1,
sample_capability_set(),
);
ann.allowed_nodes = vec![1, 2];
ann.allowed_subnets = vec![super::super::subnet::SubnetId([0x77; 16])];
ann.allowed_groups = vec![super::super::group::GroupId([0x88; 32])];
ann.sign(&keypair);
match *which {
"nodes" => ann.allowed_nodes.push(999),
"subnets" => ann
.allowed_subnets
.push(super::super::subnet::SubnetId([0x99; 16])),
"groups" => ann
.allowed_groups
.push(super::super::group::GroupId([0xAA; 32])),
_ => unreachable!(),
}
assert!(
ann.verify().is_err(),
"tampering with allowed_{} must invalidate signature",
which
);
}
}
#[test]
fn allow_list_cap_documented() {
assert_eq!(MAX_ALLOW_LIST_LEN, 64);
}
#[test]
fn from_bytes_rejects_allow_list_over_cap() {
for which in ["nodes", "subnets", "groups"] {
let mut ann = CapabilityAnnouncement::new(
1,
super::super::super::identity::EntityId::from_bytes([0xAA; 32]),
1,
sample_capability_set(),
);
match which {
"nodes" => {
ann.allowed_nodes = (0..(MAX_ALLOW_LIST_LEN as u64) + 1).collect();
}
"subnets" => {
ann.allowed_subnets = (0..(MAX_ALLOW_LIST_LEN as u8) + 1)
.map(|i| super::super::subnet::SubnetId([i; 16]))
.collect();
}
"groups" => {
ann.allowed_groups = (0..(MAX_ALLOW_LIST_LEN as u8) + 1)
.map(|i| super::super::group::GroupId([i; 32]))
.collect();
}
_ => unreachable!(),
}
let bytes = ann.to_bytes();
assert!(
CapabilityAnnouncement::from_bytes(&bytes).is_none(),
"from_bytes must reject allowed_{which} exceeding MAX_ALLOW_LIST_LEN",
);
}
}
#[test]
fn from_bytes_accepts_allow_list_at_cap() {
let mut ann = CapabilityAnnouncement::new(
1,
super::super::super::identity::EntityId::from_bytes([0xAB; 32]),
1,
sample_capability_set(),
);
ann.allowed_nodes = (0..MAX_ALLOW_LIST_LEN as u64).collect();
let bytes = ann.to_bytes();
let decoded =
CapabilityAnnouncement::from_bytes(&bytes).expect("exactly-at-cap must deserialize");
assert_eq!(decoded.allowed_nodes.len(), MAX_ALLOW_LIST_LEN);
}
#[test]
fn reflex_addr_roundtrips_through_serde_when_set() {
let reflex: std::net::SocketAddr = "198.51.100.5:54321".parse().unwrap();
let ann = CapabilityAnnouncement::new(1, test_entity(), 1, sample_capability_set())
.with_reflex_addr(Some(reflex));
let bytes = ann.to_bytes();
let restored = CapabilityAnnouncement::from_bytes(&bytes).expect("parse");
assert_eq!(restored.reflex_addr, Some(reflex));
}
#[test]
fn reflex_addr_none_is_omitted_from_wire_bytes() {
let ann = CapabilityAnnouncement::new(1, test_entity(), 1, sample_capability_set());
let bytes = ann.to_bytes();
let text = std::str::from_utf8(&bytes).expect("valid utf8");
assert!(
!text.contains("reflex_addr"),
"reflex_addr key must be omitted when the field is None; got: {text}",
);
}
#[test]
fn hop_count_zero_omits_key_while_nonzero_keeps_it() {
let caps = sample_capability_set();
let mut ann = CapabilityAnnouncement::new(1, test_entity(), 1, caps);
let zero_bytes = ann.to_bytes();
let zero_str = std::str::from_utf8(&zero_bytes).expect("utf8");
assert!(
!zero_str.contains("hop_count"),
"hop_count=0 must be omitted from serialized output",
);
ann.hop_count = 3;
let bumped_bytes = ann.to_bytes();
let bumped_str = std::str::from_utf8(&bumped_bytes).expect("utf8");
assert!(
bumped_str.contains("\"hop_count\":3"),
"hop_count>0 must survive serialization so forwarders \
can read + bump. Got: {}",
bumped_str,
);
}
#[test]
fn matches_scope_global_visible_to_tenant_filter() {
let global = CapabilityScope::Global;
assert!(matches_scope(
&global,
&ScopeFilter::Tenant("oem-123"),
false
));
assert!(matches_scope(
&global,
&ScopeFilter::Region("eu-west"),
false
));
assert!(matches_scope(&global, &ScopeFilter::Any, false));
assert!(matches_scope(&global, &ScopeFilter::GlobalOnly, false));
let tenant_only = CapabilityScope::Tenants(vec!["foo".to_string()]);
assert!(!matches_scope(
&tenant_only,
&ScopeFilter::GlobalOnly,
false
));
}
#[test]
fn matches_scope_subnet_local_excluded_from_any() {
let sl = CapabilityScope::SubnetLocal;
assert!(!matches_scope(&sl, &ScopeFilter::Any, false));
assert!(!matches_scope(&sl, &ScopeFilter::Any, true));
assert!(!matches_scope(&sl, &ScopeFilter::Tenant("foo"), true));
assert!(!matches_scope(&sl, &ScopeFilter::GlobalOnly, true));
assert!(matches_scope(&sl, &ScopeFilter::SameSubnet, true));
assert!(!matches_scope(&sl, &ScopeFilter::SameSubnet, false));
let tenants = CapabilityScope::Tenants(vec!["oem-123".to_string()]);
assert!(matches_scope(
&tenants,
&ScopeFilter::Tenant("oem-123"),
false
));
assert!(!matches_scope(
&tenants,
&ScopeFilter::Tenant("other"),
false
));
}
#[test]
fn with_tenant_scope_appends_prefixed_tag() {
let caps = CapabilitySet::new()
.add_tag("gpu")
.with_tenant_scope("oem-123");
assert!(caps.has_tag("gpu"));
assert!(caps.has_tag("scope:tenant:oem-123"));
let wire_tags: Vec<String> = caps.tags.iter().map(|t| t.to_string()).collect();
let resolved =
super::super::fold::capability_bridge::scope_from_membership_tags(&wire_tags);
assert_eq!(
resolved,
CapabilityScope::Tenants(vec!["oem-123".to_string()]),
);
}
#[test]
fn with_tenant_scope_is_idempotent_and_drops_empty() {
let caps = CapabilitySet::new()
.with_tenant_scope("oem-123")
.with_tenant_scope("oem-123") .with_tenant_scope(""); let tenant_tags: Vec<String> = caps
.tags
.iter()
.map(|t| t.to_string())
.filter(|s| s.starts_with(TAG_SCOPE_TENANT_PREFIX))
.collect();
assert_eq!(
tenant_tags.len(),
1,
"duplicate not deduped: {:?}",
caps.tags
);
assert_eq!(tenant_tags[0], "scope:tenant:oem-123");
}
#[test]
fn with_region_and_subnet_local_scope_compose_with_resolver() {
use super::super::fold::capability_bridge::scope_from_membership_tags;
let to_wire = |caps: &CapabilitySet| -> Vec<String> {
caps.tags.iter().map(|t| t.to_string()).collect()
};
let caps_region = CapabilitySet::new().with_region_scope("eu-west");
assert!(caps_region.has_tag("scope:region:eu-west"));
assert_eq!(
scope_from_membership_tags(&to_wire(&caps_region)),
CapabilityScope::Regions(vec!["eu-west".to_string()]),
);
let caps_empty_region = CapabilitySet::new().with_region_scope("");
assert!(caps_empty_region.tags.is_empty());
let caps_local = CapabilitySet::new()
.with_tenant_scope("oem-123")
.with_subnet_local_scope()
.with_subnet_local_scope(); let local_tags: Vec<String> = caps_local
.tags
.iter()
.map(|t| t.to_string())
.filter(|s| s.as_str() == TAG_SCOPE_SUBNET_LOCAL)
.collect();
assert_eq!(local_tags.len(), 1);
assert_eq!(
scope_from_membership_tags(&to_wire(&caps_local)),
CapabilityScope::SubnetLocal
);
}
fn reserved_tag(prefix: &str, body: &str) -> Tag {
Tag::Reserved {
prefix: prefix.to_string(),
body: body.to_string(),
}
}
#[test]
fn require_chain_emits_causal_reserved_tag() {
let caps = CapabilitySet::new().require_chain("abc123");
assert!(caps.tags.contains(&reserved_tag("causal:", "abc123")));
}
#[test]
fn require_chain_is_idempotent() {
let caps = CapabilitySet::new()
.require_chain("abc123")
.require_chain("abc123");
let causal_count = caps
.tags
.iter()
.filter(|t| matches!(t, Tag::Reserved { prefix, .. } if prefix == "causal:"))
.count();
assert_eq!(causal_count, 1);
}
#[test]
fn require_chain_drops_empty_hash() {
let caps = CapabilitySet::new().require_chain("");
assert!(caps.tags.is_empty());
}
#[test]
fn require_chain_tip_emits_with_seq_separator() {
let caps = CapabilitySet::new().require_chain_tip("abc", 100);
assert!(caps.tags.contains(&reserved_tag("causal:", "abc:100")));
}
#[test]
fn require_chain_range_emits_bracket_form() {
let caps = CapabilitySet::new().require_chain_range("abc", 100, 200);
assert!(caps
.tags
.contains(&reserved_tag("causal:", "abc[100..200]")));
}
#[test]
fn require_chain_range_drops_inverted_or_equal_range() {
let caps = CapabilitySet::new().require_chain_range("abc", 100, 100);
assert!(caps.tags.is_empty());
let caps = CapabilitySet::new().require_chain_range("abc", 200, 100);
assert!(caps.tags.is_empty());
}
#[test]
fn require_any_chain_emits_one_tag_per_hash() {
let caps = CapabilitySet::new().require_any_chain(["abc", "def", "ghi"]);
assert!(caps.tags.contains(&reserved_tag("causal:", "abc")));
assert!(caps.tags.contains(&reserved_tag("causal:", "def")));
assert!(caps.tags.contains(&reserved_tag("causal:", "ghi")));
assert_eq!(caps.tags.len(), 3);
}
#[test]
fn require_any_chain_skips_empty_hashes() {
let caps = CapabilitySet::new().require_any_chain(["abc", "", "def"]);
assert_eq!(caps.tags.len(), 2);
}
#[test]
fn from_fork_emits_fork_of_reserved_tag() {
let caps = CapabilitySet::new().from_fork("parent_hash");
assert!(caps.tags.contains(&reserved_tag("fork-of:", "parent_hash")));
}
#[test]
fn heat_level_emits_chain_hash_equals_rate_with_two_decimals() {
let caps = CapabilitySet::new().heat_level("abc", 0.85);
assert!(caps.tags.contains(&reserved_tag("heat:", "abc=0.85")));
}
#[test]
fn heat_level_clamps_out_of_range_rate() {
let caps = CapabilitySet::new().heat_level("abc", 1.5);
assert!(caps.tags.contains(&reserved_tag("heat:", "abc=1.00")));
let caps = CapabilitySet::new().heat_level("abc", -0.3);
assert!(caps.tags.contains(&reserved_tag("heat:", "abc=0.00")));
}
#[test]
fn heat_level_drops_non_finite_rate() {
let caps = CapabilitySet::new().heat_level("abc", f64::NAN);
assert!(caps.tags.is_empty());
let caps = CapabilitySet::new().heat_level("abc", f64::INFINITY);
assert!(caps.tags.is_empty());
}
#[test]
fn chain_helpers_compose_naturally_in_a_builder_chain() {
let caps = CapabilitySet::new()
.require_chain("origin-hash")
.require_chain_tip("chain-with-tip", 1024)
.require_chain_range("range-chain", 100, 500)
.require_any_chain(["alt-1", "alt-2"])
.from_fork("parent")
.heat_level("origin-hash", 0.5);
let reserved_count = caps
.tags
.iter()
.filter(|t| matches!(t, Tag::Reserved { .. }))
.count();
assert_eq!(reserved_count, 7, "tags: {:?}", caps.tags);
}
#[test]
fn projection_hardware_round_trips_via_from_impl() {
let hw_input = HardwareCapabilities::new().with_cpu(8, 16).with_memory(64);
let caps = CapabilitySet::new().with_hardware(hw_input.clone());
let hw_via_from: HardwareCapabilities = (&caps).into();
assert_eq!(hw_via_from, hw_input);
}
#[test]
fn projection_software_and_resource_limits_round_trip() {
let sw_input = SoftwareCapabilities::new().with_os("linux", "6.5");
let limits_input = ResourceLimits::new()
.with_max_concurrent(64)
.with_rate_limit(100);
let caps = CapabilitySet::new()
.with_software(sw_input.clone())
.with_limits(limits_input.clone());
let sw: SoftwareCapabilities = (&caps).into();
assert_eq!(sw, sw_input);
let limits: ResourceLimits = (&caps).into();
assert_eq!(limits, limits_input);
}
#[test]
fn views_struct_returns_all_five_projections() {
let caps = sample_capability_set();
let views = caps.views();
assert!(views.hardware().memory_gb > 0);
assert!(!views.models().is_empty());
assert!(!views.tools().is_empty());
}
#[test]
fn lazy_view_handle_caches_per_projection() {
let caps = sample_capability_set();
let views = caps.views();
let hw_ptr_1 = views.hardware() as *const _;
let hw_ptr_2 = views.hardware() as *const _;
assert_eq!(hw_ptr_1, hw_ptr_2, "hardware projection must be cached");
let models_ptr_1 = views.models() as *const _;
let models_ptr_2 = views.models() as *const _;
assert_eq!(
models_ptr_1, models_ptr_2,
"models projection must be cached",
);
}
#[test]
fn typed_tags_method_round_trips() {
let caps = sample_capability_set();
let tag_set = caps.typed_tags();
let caps2 = CapabilitySet::from_typed_tags(&tag_set);
let v1 = caps.views();
let v2 = caps2.views();
assert_eq!(v1.hardware(), v2.hardware());
assert_eq!(v1.models(), v2.models());
assert_eq!(v1.resource_limits(), v2.resource_limits());
let v1_tools = v1.tools();
let v2_tools = v2.tools();
assert_eq!(v1_tools.len(), v2_tools.len());
for (a, b) in v1_tools.iter().zip(v2_tools.iter()) {
assert_eq!(a.tool_id, b.tool_id);
assert_eq!(a.name, b.name);
assert_eq!(a.version, b.version);
}
}
#[test]
fn typed_tags_default_capability_set_is_empty() {
let caps = CapabilitySet::default();
assert!(caps.typed_tags().is_empty());
}
#[test]
fn diff_empty_vs_empty_is_empty() {
let prev = CapabilitySet::default();
let curr = CapabilitySet::default();
let diff = curr.diff(&prev);
assert!(diff.is_empty());
assert!(diff.added_tags.is_empty());
assert!(diff.removed_tags.is_empty());
assert!(diff.changed_metadata.is_empty());
}
#[test]
fn diff_against_empty_reports_full_added() {
let prev = CapabilitySet::default();
let curr = CapabilitySet::new()
.add_tag("inference")
.with_metadata("intent", "ml-training");
let diff = curr.diff(&prev);
assert!(!diff.is_empty());
assert_eq!(diff.added_tags.len(), 1);
let inference_tag = Tag::parse("inference").unwrap();
assert!(diff.added_tags.contains(&inference_tag));
assert!(diff.removed_tags.is_empty());
assert_eq!(diff.changed_metadata.len(), 1);
assert!(matches!(
&diff.changed_metadata[0],
MetadataChange::Added { key, value }
if key == "intent" && value == "ml-training"
));
}
#[test]
fn diff_added_and_removed_tags_are_separated() {
let prev = CapabilitySet::new().add_tag("a").add_tag("b");
let curr = CapabilitySet::new().add_tag("b").add_tag("c");
let diff = curr.diff(&prev);
let added: Vec<_> = diff.added_tags.iter().map(|t| t.to_string()).collect();
let removed: Vec<_> = diff.removed_tags.iter().map(|t| t.to_string()).collect();
assert_eq!(added, vec!["c".to_string()]);
assert_eq!(removed, vec!["a".to_string()]);
}
#[test]
fn diff_ignores_separator_form_on_axis_value_tags() {
use crate::adapter::net::behavior::tag::{AxisSeparator, Tag, TaxonomyAxis};
let mut prev = CapabilitySet::new();
prev.tags.insert(Tag::AxisValue {
axis: TaxonomyAxis::Software,
key: "os".to_string(),
value: "linux".to_string(),
separator: AxisSeparator::Eq,
});
let mut curr = CapabilitySet::new();
curr.tags.insert(Tag::AxisValue {
axis: TaxonomyAxis::Software,
key: "os".to_string(),
value: "linux".to_string(),
separator: AxisSeparator::Colon,
});
let diff = curr.diff(&prev);
assert!(
diff.added_tags.is_empty(),
"added tags should be empty for separator-only difference, got {:?}",
diff.added_tags
);
assert!(
diff.removed_tags.is_empty(),
"removed tags should be empty for separator-only difference, got {:?}",
diff.removed_tags
);
}
#[test]
fn diff_metadata_updated_for_value_change() {
let prev = CapabilitySet::new().with_metadata("intent", "ml-training");
let curr = CapabilitySet::new().with_metadata("intent", "embedding");
let diff = curr.diff(&prev);
assert!(diff.added_tags.is_empty());
assert!(diff.removed_tags.is_empty());
assert_eq!(diff.changed_metadata.len(), 1);
match &diff.changed_metadata[0] {
MetadataChange::Updated {
key,
prev_value,
new_value,
} => {
assert_eq!(key, "intent");
assert_eq!(prev_value, "ml-training");
assert_eq!(new_value, "embedding");
}
other => panic!("expected Updated, got {other:?}"),
}
}
#[test]
fn diff_metadata_key_rename_is_remove_plus_add_not_update() {
let prev = CapabilitySet::new().with_metadata("old-key", "v");
let curr = CapabilitySet::new().with_metadata("new-key", "v");
let diff = curr.diff(&prev);
assert_eq!(diff.changed_metadata.len(), 2);
let kinds: Vec<_> = diff
.changed_metadata
.iter()
.map(|c| match c {
MetadataChange::Added { key, .. } => format!("added:{key}"),
MetadataChange::Removed { key, .. } => format!("removed:{key}"),
MetadataChange::Updated { key, .. } => format!("updated:{key}"),
})
.collect();
assert!(
kinds.contains(&"added:new-key".to_string())
&& kinds.contains(&"removed:old-key".to_string()),
"expected Added(new-key) + Removed(old-key); got {kinds:?}"
);
}
#[test]
fn diff_changed_metadata_preserves_btreemap_ordering() {
let prev = CapabilitySet::default();
let curr = CapabilitySet::new()
.with_metadata("zebra", "z")
.with_metadata("alpha", "a")
.with_metadata("middle", "m");
let diff = curr.diff(&prev);
let keys: Vec<_> = diff
.changed_metadata
.iter()
.map(|c| match c {
MetadataChange::Added { key, .. }
| MetadataChange::Removed { key, .. }
| MetadataChange::Updated { key, .. } => key.clone(),
})
.collect();
assert_eq!(keys, vec!["alpha", "middle", "zebra"]);
}
#[test]
fn diff_round_trips_via_apply_diff_on_canonical_diff_engine() {
use crate::adapter::net::behavior::diff::{CapabilityDiff, DiffEngine};
let prev = CapabilitySet::new()
.add_tag("inference")
.with_metadata("intent", "old");
let curr = prev
.clone()
.add_tag("training")
.with_metadata("intent", "new")
.with_metadata("colocate-with", "chain-a");
let ops = DiffEngine::diff(&prev, &curr);
let applied =
DiffEngine::apply_with_version(&prev, 1, &CapabilityDiff::new(1, 1, 2, ops), true)
.unwrap();
assert_eq!(applied.tags, curr.tags);
let cset_diff = curr.diff(&prev);
assert!(!cset_diff.is_empty());
assert_eq!(cset_diff.changed_metadata.len(), 2);
}
#[test]
fn wire_format_serialization_snapshot() {
let caps = CapabilitySet::new()
.with_hardware(HardwareCapabilities::new().with_cpu(8, 16))
.add_tag("inference");
let json = String::from_utf8(caps.to_bytes()).unwrap();
assert!(json.contains("\"tags\":"), "missing tags field: {json}");
assert!(
json.contains("\"metadata\":"),
"missing metadata field: {json}"
);
assert!(json.contains("\"inference\""), "missing legacy tag: {json}");
assert!(
json.contains("\"hardware.cpu_cores=8\""),
"missing hardware.cpu_cores=8 tag: {json}",
);
assert!(
json.contains("\"hardware.cpu_threads=16\""),
"missing hardware.cpu_threads=16 tag: {json}",
);
assert!(
!json.contains("\"hardware\":"),
"stale hardware key: {json}"
);
assert!(
!json.contains("\"software\":"),
"stale software key: {json}"
);
assert!(!json.contains("\"models\":"), "stale models key: {json}");
assert!(!json.contains("\"tools\":"), "stale tools key: {json}");
assert!(!json.contains("\"limits\":"), "stale limits key: {json}");
}
#[test]
fn wire_format_round_trips_through_json() {
let caps = sample_capability_set();
let bytes = caps.to_bytes();
let caps2 = CapabilitySet::from_bytes(&bytes).expect("round-trip parses");
assert_eq!(caps, caps2);
}
#[test]
fn typed_tags_includes_legacy_string_tags() {
use crate::adapter::net::behavior::tag::Tag as TagT;
let caps = CapabilitySet::new()
.add_tag("inference")
.with_tenant_scope("acme");
let tag_set = caps.typed_tags();
assert!(tag_set
.iter()
.any(|t| matches!(t, TagT::Legacy(s) if s == "inference")));
assert!(tag_set
.iter()
.any(|t| matches!(t, TagT::Reserved { prefix, body }
if prefix == "scope:" && body == "tenant:acme")));
}
}