use std::collections::HashSet;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use super::*;
struct CapFold;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct CapPayload {
class_hash: u64,
tags: Vec<String>,
}
#[derive(Debug, Clone)]
struct CapQuery {
class: u64,
required_tag: Option<String>,
}
#[derive(Default)]
struct CapIndex {
by_tag: std::collections::HashMap<String, HashSet<(u64, NodeId)>>,
}
impl FoldIndex<CapFold> for CapIndex {
fn on_insert(&mut self, key: &(u64, NodeId), payload: &CapPayload) {
for tag in &payload.tags {
self.by_tag.entry(tag.clone()).or_default().insert(*key);
}
}
fn on_remove(&mut self, key: &(u64, NodeId), payload: &CapPayload) {
for tag in &payload.tags {
if let Some(set) = self.by_tag.get_mut(tag) {
set.remove(key);
if set.is_empty() {
self.by_tag.remove(tag);
}
}
}
}
fn clear(&mut self) {
self.by_tag.clear();
}
}
impl FoldKind for CapFold {
const KIND_ID: u16 = 0x0F00;
const CHANNEL_PREFIX: &'static str = "test:cap:";
const DEFAULT_TTL: Duration = Duration::from_secs(60);
type Key = (u64, NodeId);
type Payload = CapPayload;
type Query = CapQuery;
type Result = Vec<(u64, NodeId)>;
type Index = CapIndex;
fn key_for(node_id: NodeId, payload: &CapPayload) -> Self::Key {
(payload.class_hash, node_id)
}
fn build_index() -> CapIndex {
CapIndex::default()
}
fn query(state: &FoldState<Self>, index: &CapIndex, q: CapQuery) -> Vec<(u64, NodeId)> {
match &q.required_tag {
Some(tag) => {
index
.by_tag
.get(tag)
.into_iter()
.flat_map(|set| set.iter())
.filter(|(class, _)| *class == q.class)
.copied()
.collect()
}
None => state
.entries
.iter()
.filter(|((class, _), _)| *class == q.class)
.map(|(k, _)| *k)
.collect(),
}
}
}
fn cap_announcement(
node_id: NodeId,
class: u64,
generation: u64,
tags: Vec<&str>,
) -> SignedAnnouncement<CapPayload> {
SignedAnnouncement::placeholder(
CapFold::KIND_ID,
class,
node_id,
generation,
EnvelopeMeta::default(),
CapPayload {
class_hash: class,
tags: tags.into_iter().map(String::from).collect(),
},
)
}
#[test]
fn apply_then_query_round_trips_a_single_announcement() {
let fold: Fold<CapFold> = Fold::new();
let outcome = fold
.apply(cap_announcement(0x42, 0x1000, 1, vec!["gpu", "h100"]))
.expect("apply succeeds");
assert_eq!(outcome, ApplyOutcome::Inserted);
assert_eq!(fold.metrics().applies_inserted(), 1);
assert_eq!(fold.metrics().entries(), 1);
let hits = fold.query(CapQuery {
class: 0x1000,
required_tag: Some("h100".into()),
});
assert_eq!(hits, vec![(0x1000, 0x42)]);
}
#[test]
fn stale_generation_is_rejected_by_default_merge() {
let fold: Fold<CapFold> = Fold::new();
fold.apply(cap_announcement(0x42, 0x1000, 5, vec!["gpu"]))
.expect("gen=5 accepted");
let outcome = fold
.apply(cap_announcement(0x42, 0x1000, 5, vec!["different-tags"]))
.expect("apply returns Ok with Rejected outcome");
assert_eq!(outcome, ApplyOutcome::Rejected);
let outcome = fold
.apply(cap_announcement(0x42, 0x1000, 3, vec!["even-different"]))
.expect("apply returns Ok with Rejected outcome");
assert_eq!(outcome, ApplyOutcome::Rejected);
assert_eq!(fold.metrics().applies_inserted(), 1);
assert_eq!(fold.metrics().applies_rejected(), 2);
fold.with_state(|state| {
let entry = state.entries.get(&(0x1000, 0x42)).expect("entry present");
assert_eq!(entry.generation, 5);
assert_eq!(entry.payload.tags, vec!["gpu".to_string()]);
});
}
#[test]
fn higher_generation_replaces_existing_entry_and_index() {
let fold: Fold<CapFold> = Fold::new();
fold.apply(cap_announcement(0x42, 0x1000, 1, vec!["old-tag"]))
.expect("gen=1 accepted");
let outcome = fold
.apply(cap_announcement(0x42, 0x1000, 2, vec!["new-tag"]))
.expect("gen=2 accepted");
assert_eq!(outcome, ApplyOutcome::Replaced);
assert_eq!(fold.metrics().applies_replaced(), 1);
let old_hits = fold.query(CapQuery {
class: 0x1000,
required_tag: Some("old-tag".into()),
});
assert!(old_hits.is_empty(), "stale tag must be evicted from index");
let new_hits = fold.query(CapQuery {
class: 0x1000,
required_tag: Some("new-tag".into()),
});
assert_eq!(new_hits, vec![(0x1000, 0x42)]);
}
#[test]
fn generation_zero_is_refused_with_invalid_generation_error() {
let fold: Fold<CapFold> = Fold::new();
let result = fold.apply(cap_announcement(0x42, 0x1000, 0, vec!["gpu"]));
match result {
Err(FoldError::InvalidGeneration { node_id }) => assert_eq!(node_id, 0x42),
other => panic!("expected InvalidGeneration, got {other:?}"),
}
assert_eq!(fold.metrics().applies_rejected(), 1);
assert_eq!(fold.metrics().entries(), 0);
}
#[test]
fn evict_node_drops_every_entry_and_index_attachment_for_that_node() {
let fold: Fold<CapFold> = Fold::new();
fold.apply(cap_announcement(0x42, 0x1000, 1, vec!["gpu"]))
.expect("first apply");
fold.apply(cap_announcement(0x42, 0x2000, 1, vec!["tpu"]))
.expect("second apply");
fold.apply(cap_announcement(0x43, 0x1000, 1, vec!["gpu"]))
.expect("third apply");
assert_eq!(fold.metrics().entries(), 3);
fold.evict_node(0x42, "test");
assert_eq!(fold.metrics().entries(), 1);
assert_eq!(fold.metrics().evictions(), 2);
fold.with_state(|state| {
assert!(state.entries.contains_key(&(0x1000, 0x43)));
assert!(!state.entries.contains_key(&(0x1000, 0x42)));
assert!(!state.entries.contains_key(&(0x2000, 0x42)));
assert!(!state.by_node.contains_key(&0x42));
});
let gpu_hits: HashSet<_> = fold
.query(CapQuery {
class: 0x1000,
required_tag: Some("gpu".into()),
})
.into_iter()
.collect();
assert_eq!(gpu_hits, [(0x1000, 0x43)].into_iter().collect());
let tpu_hits = fold.query(CapQuery {
class: 0x2000,
required_tag: Some("tpu".into()),
});
assert!(tpu_hits.is_empty());
}
#[test]
fn snapshot_round_trips_via_restore() {
let fold: Fold<CapFold> = Fold::new();
fold.apply(cap_announcement(0x42, 0x1000, 1, vec!["gpu", "h100"]))
.expect("apply #1");
fold.apply(cap_announcement(0x43, 0x1000, 1, vec!["gpu"]))
.expect("apply #2");
fold.apply(cap_announcement(0x42, 0x2000, 1, vec!["tpu"]))
.expect("apply #3");
let snap = fold.snapshot();
assert_eq!(snap.kind, CapFold::KIND_ID);
assert_eq!(snap.entries.len(), 3);
let restored: Fold<CapFold> = Fold::new();
restored.restore(snap, false).expect("restore succeeds");
assert_eq!(restored.metrics().entries(), 3);
assert_eq!(restored.metrics().snapshots_restored(), 1);
let h100_hits = restored.query(CapQuery {
class: 0x1000,
required_tag: Some("h100".into()),
});
assert_eq!(h100_hits, vec![(0x1000, 0x42)]);
restored
.apply(cap_announcement(0x42, 0x1000, 2, vec!["new-tag"]))
.expect("post-restore apply");
let new_tag = restored.query(CapQuery {
class: 0x1000,
required_tag: Some("new-tag".into()),
});
assert_eq!(new_tag, vec![(0x1000, 0x42)]);
}
#[test]
fn restore_drops_entries_whose_ttl_lapsed_during_downtime() {
let fold: Fold<CapFold> = Fold::with_sweep_interval(Duration::ZERO);
fold.apply(cap_announcement(0x42, 0x1000, 1, vec!["gpu"]))
.expect("apply");
let mut snap = fold.snapshot();
assert_eq!(snap.entries.len(), 1);
let max_expires_ns: u64 = snap
.entries
.iter()
.map(|e| e.expires_offset_ns)
.max()
.unwrap();
let max_expires_us = max_expires_ns / 1000 + 1;
let now_us = crate::adapter::net::current_timestamp_micros();
snap.taken_at_unix_us = now_us.saturating_sub(max_expires_us + 1_000_000);
let restored: Fold<CapFold> = Fold::with_sweep_interval(Duration::ZERO);
restored.restore(snap, false).expect("restore");
assert_eq!(
restored.metrics().entries(),
0,
"entries past their TTL by the restore moment must be dropped"
);
restored.with_state(|state| {
assert!(state.entries.is_empty());
assert!(state.by_node.is_empty());
});
}
#[test]
fn restore_consumes_elapsed_downtime_out_of_remaining_ttl() {
let fold: Fold<CapFold> = Fold::with_sweep_interval(Duration::ZERO);
fold.apply(cap_announcement(0x42, 0x1000, 1, vec!["gpu"]))
.expect("apply");
let mut snap = fold.snapshot();
assert_eq!(snap.entries.len(), 1);
snap.taken_at_unix_us = snap.taken_at_unix_us.saturating_sub(30_000_000);
let restored: Fold<CapFold> = Fold::with_sweep_interval(Duration::ZERO);
let before_restore = std::time::Instant::now();
restored.restore(snap, false).expect("restore");
let after_restore = std::time::Instant::now();
assert_eq!(restored.metrics().entries(), 1);
restored.with_state(|state| {
let entry = state.entries.values().next().expect("entry present");
let remaining_lower = entry.expires_at.saturating_duration_since(after_restore);
let remaining_upper = entry.expires_at.saturating_duration_since(before_restore);
assert!(
remaining_upper <= Duration::from_secs(35),
"restored TTL must consume elapsed downtime (got remaining {:?})",
remaining_upper,
);
assert!(
remaining_lower >= Duration::from_secs(25),
"restored TTL should still carry ~30s (got remaining {:?})",
remaining_lower,
);
});
}
#[test]
fn restore_clock_skew_backwards_treats_as_fresh_snapshot() {
let fold: Fold<CapFold> = Fold::with_sweep_interval(Duration::ZERO);
fold.apply(cap_announcement(0x42, 0x1000, 1, vec!["gpu"]))
.expect("apply");
let mut snap = fold.snapshot();
assert_eq!(snap.entries.len(), 1);
snap.taken_at_unix_us = u64::MAX;
let restored: Fold<CapFold> = Fold::with_sweep_interval(Duration::ZERO);
restored.restore(snap, false).expect("restore");
assert_eq!(restored.metrics().entries(), 1);
}
#[test]
fn restore_over_live_state_without_force_is_refused() {
let fold: Fold<CapFold> = Fold::new();
fold.apply(cap_announcement(0x42, 0x1000, 1, vec!["gpu"]))
.expect("apply");
let snap = fold.snapshot();
let live: Fold<CapFold> = Fold::new();
live.apply(cap_announcement(0x43, 0x1000, 1, vec!["different"]))
.expect("apply on live");
match live.restore(snap, false) {
Err(FoldError::RestoreOverLiveState { current_len }) => assert_eq!(current_len, 1),
other => panic!("expected RestoreOverLiveState, got {other:?}"),
}
live.with_state(|state| {
assert_eq!(state.entries.len(), 1);
assert!(state.entries.contains_key(&(0x1000, 0x43)));
});
}
struct RoutingTestFold;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
struct RoutePayload {
destination: NodeId,
metric: u32,
}
impl FoldKind for RoutingTestFold {
const KIND_ID: u16 = 0x0F01;
const CHANNEL_PREFIX: &'static str = "test:route:";
const DEFAULT_TTL: Duration = Duration::from_secs(300);
type Key = NodeId;
type Payload = RoutePayload;
type Query = NodeId;
type Result = Option<RoutePayload>;
type Index = NoIndex;
fn key_for(_node_id: NodeId, payload: &RoutePayload) -> NodeId {
payload.destination
}
fn build_index() -> NoIndex {
NoIndex
}
fn merge(
existing: Option<&FoldEntry<Self>>,
incoming: &SignedAnnouncement<RoutePayload>,
) -> MergeAction {
match existing {
None => MergeAction::Insert,
Some(e) if incoming.payload.metric < e.payload.metric => MergeAction::Replace,
_ => MergeAction::Reject,
}
}
fn query(state: &FoldState<Self>, _index: &NoIndex, dest: NodeId) -> Option<RoutePayload> {
state.entries.get(&dest).map(|e| e.payload.clone())
}
}
fn route_announcement(
publisher: NodeId,
dest: NodeId,
metric: u32,
generation: u64,
) -> SignedAnnouncement<RoutePayload> {
SignedAnnouncement::placeholder(
RoutingTestFold::KIND_ID,
0,
publisher,
generation,
EnvelopeMeta::default(),
RoutePayload {
destination: dest,
metric,
},
)
}
#[test]
fn routing_merge_override_picks_lower_metric_across_publishers() {
let fold: Fold<RoutingTestFold> = Fold::new();
fold.apply(route_announcement(0xAA, 0x99, 50, 1))
.expect("publisher AA accepted at metric 50");
let route = fold.query(0x99).expect("destination present");
assert_eq!(route.metric, 50);
fold.apply(route_announcement(0xBB, 0x99, 20, 1))
.expect("publisher BB accepted at metric 20");
let route = fold.query(0x99).expect("destination still present");
assert_eq!(route.metric, 20);
let outcome = fold
.apply(route_announcement(0xCC, 0x99, 100, 100))
.expect("CC rejected by metric");
assert_eq!(outcome, ApplyOutcome::Rejected);
let route = fold.query(0x99).expect("destination still present");
assert_eq!(route.metric, 20, "lower-metric route must stick");
}
#[test]
fn metrics_counts_track_apply_outcomes_and_query_count() {
let fold: Fold<CapFold> = Fold::new();
fold.apply(cap_announcement(0x1, 0x100, 1, vec!["a"]))
.unwrap();
fold.apply(cap_announcement(0x2, 0x100, 1, vec!["b"]))
.unwrap();
fold.apply(cap_announcement(0x3, 0x100, 1, vec!["c"]))
.unwrap();
fold.apply(cap_announcement(0x1, 0x100, 2, vec!["a2"]))
.unwrap();
fold.apply(cap_announcement(0x2, 0x100, 1, vec!["b-stale"]))
.unwrap();
let m = fold.metrics();
assert_eq!(m.applies_inserted(), 3);
assert_eq!(m.applies_replaced(), 1);
assert_eq!(m.applies_rejected(), 1);
assert_eq!(m.applies_total(), 5);
assert_eq!(m.entries(), 3);
assert_eq!(m.queries(), 0);
fold.query(CapQuery {
class: 0x100,
required_tag: None,
});
assert_eq!(m.queries(), 1);
}
use std::sync::Arc;
use crate::adapter::net::identity::EntityKeypair;
use super::dispatch::{DispatchError, FoldRegistry};
use super::wire::placeholder_signature;
use super::wire::WireError;
fn sign_cap_ann(
keypair: &EntityKeypair,
node_id: NodeId,
class: u64,
generation: u64,
tags: Vec<&str>,
) -> SignedAnnouncement<CapPayload> {
SignedAnnouncement::sign(
keypair,
CapFold::KIND_ID,
class,
node_id,
generation,
EnvelopeMeta::default(),
CapPayload {
class_hash: class,
tags: tags.into_iter().map(String::from).collect(),
},
)
.expect("sign succeeds with valid payload")
}
#[test]
fn signed_announcement_round_trips_through_postcard_encode_decode() {
let kp = EntityKeypair::generate();
let ann = sign_cap_ann(&kp, 0x42, 0x1000, 1, vec!["gpu", "h100"]);
let bytes = ann.encode().expect("encode");
let decoded = SignedAnnouncement::<CapPayload>::decode(&bytes).expect("decode");
assert_eq!(decoded.kind, ann.kind);
assert_eq!(decoded.class, ann.class);
assert_eq!(decoded.node_id, ann.node_id);
assert_eq!(decoded.generation, ann.generation);
assert_eq!(decoded.announced_at, ann.announced_at);
assert_eq!(decoded.ttl_secs, ann.ttl_secs);
assert_eq!(decoded.flags, ann.flags);
assert_eq!(decoded.payload, ann.payload);
assert_eq!(decoded.signature, ann.signature);
}
#[test]
fn signature_verifies_against_publisher_identity() {
let kp = EntityKeypair::generate();
let ann = sign_cap_ann(&kp, 0x42, 0x1000, 1, vec!["gpu"]);
ann.verify(kp.entity_id())
.expect("verify must accept untampered envelope");
}
#[test]
fn verify_rejects_signature_from_a_different_keypair() {
let signer = EntityKeypair::generate();
let imposter = EntityKeypair::generate();
let ann = sign_cap_ann(&signer, 0x42, 0x1000, 1, vec!["gpu"]);
match ann.verify(imposter.entity_id()) {
Err(WireError::InvalidSignature) => {}
other => panic!("expected InvalidSignature, got {other:?}"),
}
}
#[test]
fn verify_rejects_tampered_payload() {
let kp = EntityKeypair::generate();
let mut ann = sign_cap_ann(&kp, 0x42, 0x1000, 1, vec!["gpu"]);
ann.payload.tags = vec!["malicious-tag".into()];
match ann.verify(kp.entity_id()) {
Err(WireError::InvalidSignature) => {}
other => panic!("expected InvalidSignature, got {other:?}"),
}
}
#[test]
fn verify_rejects_placeholder_signature_sentinel() {
let kp = EntityKeypair::generate();
let ann = cap_announcement(0x42, 0x1000, 1, vec!["gpu"]);
assert_eq!(ann.signature, placeholder_signature());
match ann.verify(kp.entity_id()) {
Err(WireError::PlaceholderSignature) => {}
other => panic!("expected PlaceholderSignature, got {other:?}"),
}
}
#[test]
fn verify_rejects_signature_of_wrong_length() {
let kp = EntityKeypair::generate();
let mut ann = sign_cap_ann(&kp, 0x42, 0x1000, 1, vec!["gpu"]);
ann.signature.pop();
match ann.verify(kp.entity_id()) {
Err(WireError::BadSignatureLength(len)) => assert_eq!(len, 63),
other => panic!("expected BadSignatureLength, got {other:?}"),
}
}
#[test]
fn decode_and_verify_drives_the_dispatch_hot_path() {
let kp = EntityKeypair::generate();
let ann = sign_cap_ann(&kp, 0x42, 0x1000, 1, vec!["gpu"]);
let bytes = ann.encode().expect("encode");
let verified = SignedAnnouncement::<CapPayload>::decode_and_verify(&bytes, kp.entity_id())
.expect("decode + verify must succeed for a freshly-signed envelope");
assert_eq!(verified.node_id, 0x42);
assert_eq!(verified.payload.tags, vec!["gpu".to_string()]);
}
#[test]
fn fold_registry_routes_envelope_to_correct_fold_by_kind() {
let registry = FoldRegistry::new();
let cap_fold: Arc<Fold<CapFold>> = Arc::new(Fold::new());
let route_fold: Arc<Fold<RoutingTestFold>> = Arc::new(Fold::new());
registry.register(cap_fold.clone());
registry.register(route_fold.clone());
assert_eq!(registry.len(), 2);
assert!(registry.get(CapFold::KIND_ID).is_some());
assert!(registry.get(RoutingTestFold::KIND_ID).is_some());
assert!(registry.get(0xBADD).is_none());
let kp = EntityKeypair::generate();
let cap_ann = sign_cap_ann(&kp, 0x42, 0x1000, 1, vec!["gpu", "h100"]);
let cap_bytes = cap_ann.encode().expect("encode");
let outcome = registry
.dispatch(&cap_bytes, kp.entity_id())
.expect("dispatch succeeds");
assert_eq!(outcome, ApplyOutcome::Inserted);
assert_eq!(cap_fold.metrics().applies_inserted(), 1);
assert_eq!(route_fold.metrics().applies_inserted(), 0);
let hits = cap_fold.query(CapQuery {
class: 0x1000,
required_tag: Some("h100".into()),
});
assert_eq!(hits, vec![(0x1000, 0x42)]);
}
#[test]
fn registry_rejects_envelope_for_unknown_kind() {
let registry = FoldRegistry::new();
let kp = EntityKeypair::generate();
let ann = sign_cap_ann(&kp, 0x42, 0x1000, 1, vec!["gpu"]);
let bytes = ann.encode().expect("encode");
match registry.dispatch(&bytes, kp.entity_id()) {
Err(DispatchError::UnknownKind(k)) => assert_eq!(k, CapFold::KIND_ID),
other => panic!("expected UnknownKind, got {other:?}"),
}
}
#[test]
fn registry_rejects_truncated_envelope() {
let registry = FoldRegistry::new();
let kp = EntityKeypair::generate();
match registry.dispatch(b"", kp.entity_id()) {
Err(DispatchError::Truncated) => {}
other => panic!("empty: expected Truncated, got {other:?}"),
}
match registry.dispatch(b"\x80", kp.entity_id()) {
Err(DispatchError::Truncated) => {}
other => panic!("mid-varint: expected Truncated, got {other:?}"),
}
}
#[test]
fn registry_rejects_envelope_whose_kind_disagrees_with_routed_fold() {
let registry = FoldRegistry::new();
let cap_fold: Arc<Fold<CapFold>> = Arc::new(Fold::new());
registry.register(cap_fold.clone());
let kp = EntityKeypair::generate();
let foreign = SignedAnnouncement::sign(
&kp,
0xFFFF, 0x1000,
0x42,
1,
EnvelopeMeta::default(),
CapPayload {
class_hash: 0x1000,
tags: vec!["gpu".into()],
},
)
.expect("sign");
let bytes = foreign.encode().expect("encode");
let adapter = registry.get(CapFold::KIND_ID).expect("cap fold registered");
match adapter.dispatch(&bytes, kp.entity_id()) {
Err(WireError::KindMismatch { got, expected }) => {
assert_eq!(got, 0xFFFF);
assert_eq!(expected, CapFold::KIND_ID);
}
other => panic!("expected KindMismatch, got {other:?}"),
}
assert_eq!(cap_fold.metrics().applies_inserted(), 0);
}
#[test]
fn registry_can_deregister_a_fold() {
let registry = FoldRegistry::new();
let cap_fold: Arc<Fold<CapFold>> = Arc::new(Fold::new());
registry.register(cap_fold);
assert_eq!(registry.len(), 1);
let removed = registry.deregister(CapFold::KIND_ID);
assert!(removed.is_some());
assert!(registry.is_empty());
}
use super::dispatch::FoldChannelRouter;
use crate::adapter::net::identity::EntityId;
#[test]
fn fold_registry_implements_channel_router_trait() {
let registry = FoldRegistry::new();
let cap_fold: Arc<Fold<CapFold>> = Arc::new(Fold::new());
registry.register(cap_fold.clone());
let registry: Arc<dyn FoldChannelRouter> = Arc::new(registry);
let kp = EntityKeypair::generate();
let ann = sign_cap_ann(&kp, 0x42, 0x1000, 1, vec!["gpu"]);
let bytes = ann.encode().expect("encode");
let outcome = registry
.try_route(kp.entity_id(), &bytes)
.expect("router accepts signed envelope");
assert_eq!(outcome, ApplyOutcome::Inserted);
assert_eq!(cap_fold.metrics().applies_inserted(), 1);
}
#[test]
fn channel_router_surface_propagates_signature_failure() {
let registry = FoldRegistry::new();
let cap_fold: Arc<Fold<CapFold>> = Arc::new(Fold::new());
registry.register(cap_fold.clone());
let router: Arc<dyn FoldChannelRouter> = Arc::new(registry);
let signer = EntityKeypair::generate();
let imposter = EntityKeypair::generate();
let ann = sign_cap_ann(&signer, 0x42, 0x1000, 1, vec!["gpu"]);
let bytes = ann.encode().expect("encode");
match router.try_route(imposter.entity_id(), &bytes) {
Err(DispatchError::Wire(WireError::InvalidSignature)) => {}
other => panic!("expected InvalidSignature, got {other:?}"),
}
assert_eq!(cap_fold.metrics().applies_inserted(), 0);
}
#[test]
fn channel_router_drops_envelope_for_unknown_kind() {
let registry = FoldRegistry::new();
let router: Arc<dyn FoldChannelRouter> = Arc::new(registry);
let kp = EntityKeypair::generate();
let ann = sign_cap_ann(&kp, 0x42, 0x1000, 1, vec!["gpu"]);
let bytes = ann.encode().expect("encode");
match router.try_route(kp.entity_id(), &bytes) {
Err(DispatchError::UnknownKind(k)) => assert_eq!(k, CapFold::KIND_ID),
other => panic!("expected UnknownKind, got {other:?}"),
}
}
#[test]
fn subprotocol_fold_id_is_stable() {
assert_eq!(super::dispatch::SUBPROTOCOL_FOLD, 0x1000);
}
#[test]
fn entity_id_is_what_the_router_trait_takes() {
fn _accepts_entity_id<R: FoldChannelRouter>(
r: &R,
e: &EntityId,
b: &[u8],
) -> Result<ApplyOutcome, DispatchError> {
r.try_route(e, b)
}
let registry = FoldRegistry::new();
let _registered: Arc<dyn FoldChannelRouter> = Arc::new(registry);
}
#[test]
fn publisher_to_receiver_full_pipeline_in_process() {
let publisher_kp = EntityKeypair::generate();
let ann = sign_cap_ann(&publisher_kp, 0x42, 0x1000, 1, vec!["gpu", "h100"]);
let wire_bytes = ann.encode().expect("publisher: encode succeeds");
let registry = FoldRegistry::new();
let cap_fold: Arc<Fold<CapFold>> = Arc::new(Fold::new());
registry.register(cap_fold.clone());
let router: Arc<dyn FoldChannelRouter> = Arc::new(registry);
let outcome = router
.try_route(publisher_kp.entity_id(), &wire_bytes)
.expect("receiver: dispatch succeeds for valid envelope");
assert_eq!(outcome, ApplyOutcome::Inserted);
let hits = cap_fold.query(CapQuery {
class: 0x1000,
required_tag: Some("h100".into()),
});
assert_eq!(hits, vec![(0x1000, 0x42)]);
assert_eq!(cap_fold.metrics().applies_inserted(), 1);
}
#[test]
fn publisher_encode_is_stable_across_calls() {
let kp = EntityKeypair::generate();
let ann1 = sign_cap_ann(&kp, 0x42, 0x1000, 1, vec!["gpu", "h100"]);
let ann2 = ann1.clone();
assert_eq!(
ann1.encode().expect("first encode"),
ann2.encode().expect("second encode"),
"wire encoding must be deterministic across repeated encode() calls"
);
}
#[test]
fn receiver_rejects_envelope_signed_for_a_different_publisher() {
let real_publisher = EntityKeypair::generate();
let session_owner = EntityKeypair::generate();
let ann = sign_cap_ann(&real_publisher, 0x42, 0x1000, 1, vec!["gpu"]);
let wire_bytes = ann.encode().expect("encode");
let registry = FoldRegistry::new();
let cap_fold: Arc<Fold<CapFold>> = Arc::new(Fold::new());
registry.register(cap_fold.clone());
let router: Arc<dyn FoldChannelRouter> = Arc::new(registry);
match router.try_route(session_owner.entity_id(), &wire_bytes) {
Err(DispatchError::Wire(WireError::InvalidSignature)) => {}
other => panic!("expected InvalidSignature, got {other:?}"),
}
assert_eq!(
cap_fold.metrics().applies_inserted(),
0,
"no apply may be credited to the fold when verify fails"
);
}
use super::audit::{NoopSink, VecFoldAuditSink};
fn sign_cap_ann_with_ttl(
keypair: &EntityKeypair,
node_id: NodeId,
class: u64,
generation: u64,
ttl_secs: u32,
tags: Vec<&str>,
) -> SignedAnnouncement<CapPayload> {
SignedAnnouncement::sign(
keypair,
CapFold::KIND_ID,
class,
node_id,
generation,
EnvelopeMeta {
ttl_secs: Some(ttl_secs),
..Default::default()
},
CapPayload {
class_hash: class,
tags: tags.into_iter().map(String::from).collect(),
},
)
.expect("sign succeeds")
}
#[test]
fn sweep_expired_removes_entries_past_ttl() {
let fold: Fold<CapFold> = Fold::with_sweep_interval(std::time::Duration::ZERO);
let kp = EntityKeypair::generate();
fold.apply(sign_cap_ann_with_ttl(&kp, 0xA, 0x100, 1, 0, vec!["a"]))
.expect("a accepted");
fold.apply(sign_cap_ann_with_ttl(&kp, 0xB, 0x100, 1, 0, vec!["b"]))
.expect("b accepted");
fold.apply(sign_cap_ann_with_ttl(&kp, 0xC, 0x100, 1, 300, vec!["c"]))
.expect("c accepted");
assert_eq!(fold.metrics().entries(), 3);
assert_eq!(fold.metrics().expiries(), 0);
std::thread::sleep(std::time::Duration::from_millis(10));
let evicted = fold.sweep_expired_now();
assert_eq!(evicted, 2, "two expired entries evicted, one remains");
assert_eq!(fold.metrics().entries(), 1);
assert_eq!(fold.metrics().expiries(), 2);
fold.with_state(|state| {
assert!(state.entries.contains_key(&(0x100, 0xC)));
assert!(!state.entries.contains_key(&(0x100, 0xA)));
assert!(!state.entries.contains_key(&(0x100, 0xB)));
assert!(!state.by_node.contains_key(&0xA));
assert!(!state.by_node.contains_key(&0xB));
assert!(state.by_node.contains_key(&0xC));
});
assert!(fold
.query(CapQuery {
class: 0x100,
required_tag: Some("a".into()),
})
.is_empty());
assert!(fold
.query(CapQuery {
class: 0x100,
required_tag: Some("b".into()),
})
.is_empty());
assert_eq!(
fold.query(CapQuery {
class: 0x100,
required_tag: Some("c".into()),
}),
vec![(0x100, 0xC)]
);
}
#[test]
fn sweep_with_no_expired_entries_is_a_no_op() {
let fold: Fold<CapFold> = Fold::with_sweep_interval(std::time::Duration::ZERO);
let kp = EntityKeypair::generate();
fold.apply(sign_cap_ann_with_ttl(&kp, 0xA, 0x100, 1, 300, vec!["a"]))
.expect("a accepted");
let evicted = fold.sweep_expired_now();
assert_eq!(evicted, 0);
assert_eq!(fold.metrics().expiries(), 0);
assert_eq!(fold.metrics().entries(), 1);
}
#[test]
fn sweep_evicts_across_multiple_chunks_when_count_exceeds_chunk_size() {
let fold: Fold<CapFold> = Fold::with_sweep_interval(std::time::Duration::ZERO);
let kp = EntityKeypair::generate();
const N: u64 = 1500;
for i in 0..N {
fold.apply(sign_cap_ann_with_ttl(&kp, i, 0x100, 1, 0, vec!["t"]))
.expect("apply");
}
assert_eq!(fold.metrics().entries(), N);
std::thread::sleep(std::time::Duration::from_millis(10));
let evicted = fold.sweep_expired_now();
assert_eq!(
evicted, N as usize,
"all expired entries evicted across chunks"
);
assert_eq!(fold.metrics().entries(), 0);
assert_eq!(fold.metrics().expiries(), N);
}
struct AuditingCapFold;
impl FoldKind for AuditingCapFold {
const KIND_ID: u16 = 0x0F02;
const CHANNEL_PREFIX: &'static str = "test:audit-cap:";
const DEFAULT_TTL: std::time::Duration = std::time::Duration::from_secs(60);
type Key = (u64, NodeId);
type Payload = CapPayload;
type Query = CapQuery;
type Result = Vec<(u64, NodeId)>;
type Index = NoIndex;
fn key_for(node_id: NodeId, payload: &CapPayload) -> Self::Key {
(payload.class_hash, node_id)
}
fn build_index() -> NoIndex {
NoIndex
}
fn query(state: &FoldState<Self>, _index: &NoIndex, q: CapQuery) -> Vec<(u64, NodeId)> {
state
.entries
.iter()
.filter(|((class, _), _)| *class == q.class)
.map(|(k, _)| *k)
.collect()
}
fn audit_event(transition: super::EntryTransition<'_, Self>) -> Option<super::AuditEvent> {
use super::AuditKind;
let (kind, key_repr, detail) = match transition {
super::EntryTransition::Created { key, .. } => {
(AuditKind::Created, format!("{key:?}"), None)
}
super::EntryTransition::Replaced { key, old, new } => (
AuditKind::Replaced,
format!("{key:?}"),
Some(format!("gen {} → {}", old.generation, new.generation)),
),
super::EntryTransition::Rejected { key, .. } => {
(AuditKind::Rejected, format!("{key:?}"), None)
}
super::EntryTransition::Evicted { key, reason, .. } => (
AuditKind::Evicted,
format!("{key:?}"),
Some(reason.to_string()),
),
super::EntryTransition::Expired { key, .. } => {
(AuditKind::Expired, format!("{key:?}"), None)
}
};
Some(super::AuditEvent {
kind,
key_repr,
detail,
})
}
}
fn sign_audit_ann(
kp: &EntityKeypair,
node_id: NodeId,
class: u64,
generation: u64,
ttl_secs: u32,
tags: Vec<&str>,
) -> SignedAnnouncement<CapPayload> {
SignedAnnouncement::sign(
kp,
AuditingCapFold::KIND_ID,
class,
node_id,
generation,
EnvelopeMeta {
ttl_secs: Some(ttl_secs),
..Default::default()
},
CapPayload {
class_hash: class,
tags: tags.into_iter().map(String::from).collect(),
},
)
.expect("sign")
}
#[test]
fn audit_sink_receives_create_replace_evict_and_expire_transitions() {
let fold: Fold<AuditingCapFold> = Fold::with_sweep_interval(std::time::Duration::ZERO);
let sink: Arc<VecFoldAuditSink> = Arc::new(VecFoldAuditSink::new());
fold.set_audit_sink(Some(sink.clone() as Arc<dyn super::FoldAuditSink>));
assert!(fold.has_audit_sink());
let kp = EntityKeypair::generate();
fold.apply(sign_audit_ann(&kp, 0xA, 0x100, 1, 300, vec!["a"]))
.expect("create");
assert_eq!(sink.snapshot().len(), 1);
assert_eq!(sink.snapshot()[0].kind, super::AuditKind::Created);
fold.apply(sign_audit_ann(&kp, 0xA, 0x100, 2, 300, vec!["a2"]))
.expect("replace");
assert_eq!(sink.snapshot().len(), 2);
assert_eq!(sink.snapshot()[1].kind, super::AuditKind::Replaced);
assert_eq!(sink.snapshot()[1].detail.as_deref(), Some("gen 1 → 2"));
fold.apply(sign_audit_ann(&kp, 0xA, 0x100, 2, 300, vec!["bogus"]))
.expect("reject");
assert_eq!(sink.snapshot().len(), 3);
assert_eq!(sink.snapshot()[2].kind, super::AuditKind::Rejected);
fold.evict_node(0xA, "SWIM declared dead");
assert_eq!(sink.snapshot().len(), 4);
assert_eq!(sink.snapshot()[3].kind, super::AuditKind::Evicted);
assert_eq!(
sink.snapshot()[3].detail.as_deref(),
Some("SWIM declared dead")
);
fold.apply(sign_audit_ann(&kp, 0xB, 0x100, 1, 0, vec!["b"]))
.expect("create-for-expire");
std::thread::sleep(std::time::Duration::from_millis(10));
let n = fold.sweep_expired_now();
assert_eq!(n, 1);
let trail = sink.snapshot();
assert_eq!(
trail.last().expect("trail non-empty").kind,
super::AuditKind::Expired
);
}
#[test]
fn audit_sink_can_be_uninstalled() {
let fold: Fold<AuditingCapFold> = Fold::with_sweep_interval(std::time::Duration::ZERO);
let sink: Arc<VecFoldAuditSink> = Arc::new(VecFoldAuditSink::new());
fold.set_audit_sink(Some(sink.clone() as Arc<dyn super::FoldAuditSink>));
let kp = EntityKeypair::generate();
fold.apply(sign_audit_ann(&kp, 0xA, 0x100, 1, 300, vec!["a"]))
.expect("create");
assert_eq!(sink.len(), 1);
fold.set_audit_sink(None);
assert!(!fold.has_audit_sink());
fold.apply(sign_audit_ann(&kp, 0xB, 0x100, 1, 300, vec!["b"]))
.expect("create-2");
assert_eq!(
sink.len(),
1,
"post-uninstall events must not reach the sink"
);
}
#[test]
fn noop_sink_swallows_events_without_storing() {
let fold: Fold<AuditingCapFold> = Fold::with_sweep_interval(std::time::Duration::ZERO);
fold.set_audit_sink(Some(Arc::new(NoopSink) as Arc<dyn super::FoldAuditSink>));
let kp = EntityKeypair::generate();
for i in 0..16 {
fold.apply(sign_audit_ann(&kp, i as u64, 0x100, 1, 300, vec!["t"]))
.expect("apply");
}
assert_eq!(fold.metrics().applies_inserted(), 16);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn background_sweeper_evicts_expired_entries_on_tick() {
let fold: Fold<CapFold> = Fold::with_sweep_interval(std::time::Duration::from_millis(50));
let kp = EntityKeypair::generate();
fold.apply(sign_cap_ann_with_ttl(&kp, 0xA, 0x100, 1, 0, vec!["a"]))
.expect("apply");
assert_eq!(fold.metrics().entries(), 1);
assert_eq!(fold.metrics().expiries(), 0);
tokio::time::advance(std::time::Duration::from_millis(60)).await;
tokio::task::yield_now().await;
tokio::time::advance(std::time::Duration::from_millis(60)).await;
tokio::task::yield_now().await;
assert_eq!(fold.metrics().expiries(), 1);
assert_eq!(fold.metrics().entries(), 0);
}
use super::audit::RingFoldAuditSink;
use super::metrics::FoldStats;
#[test]
fn fold_stats_snapshot_reflects_live_counters() {
let fold: Fold<CapFold> = Fold::with_sweep_interval(std::time::Duration::ZERO);
fold.apply(cap_announcement(0x1, 0x100, 1, vec!["a"]))
.expect("ins-1");
fold.apply(cap_announcement(0x2, 0x100, 1, vec!["b"]))
.expect("ins-2");
fold.apply(cap_announcement(0x1, 0x100, 2, vec!["a2"]))
.expect("replace");
let _ = fold
.apply(cap_announcement(0x1, 0x100, 1, vec!["stale"]))
.expect("reject"); let _ = fold.query(CapQuery {
class: 0x100,
required_tag: Some("a2".into()),
});
let snap = fold.stats();
assert_eq!(snap.kind, CapFold::KIND_ID);
assert_eq!(snap.channel_prefix, CapFold::CHANNEL_PREFIX);
assert_eq!(snap.entries, 2);
assert_eq!(snap.applies_inserted, 2);
assert_eq!(snap.applies_replaced, 1);
assert_eq!(snap.applies_rejected, 1);
assert_eq!(snap.applies_total, 4);
assert_eq!(snap.expiries, 0);
assert_eq!(snap.evictions, 0);
assert_eq!(snap.queries, 1);
assert_eq!(snap.snapshots_taken, 0);
assert_eq!(snap.snapshots_restored, 0);
assert!(!snap.has_audit_sink);
fold.evict_node(0x2, "test");
let _snap = fold.snapshot();
let restored: Fold<CapFold> = Fold::with_sweep_interval(std::time::Duration::ZERO);
let s2 = fold.snapshot();
restored.restore(s2, false).expect("restore");
fold.set_audit_sink(Some(Arc::new(NoopSink) as Arc<dyn super::FoldAuditSink>));
let snap = fold.stats();
assert_eq!(snap.entries, 1, "after evict_node(0x2): only 0x1 remains");
assert_eq!(snap.evictions, 1);
assert_eq!(snap.snapshots_taken, 2);
assert!(snap.has_audit_sink);
assert_eq!(restored.stats().snapshots_restored, 1);
}
#[test]
fn fold_registry_stats_aggregates_across_kinds() {
let registry = FoldRegistry::new();
let cap: Arc<Fold<CapFold>> = Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
let route: Arc<Fold<RoutingTestFold>> =
Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
registry.register(cap.clone());
registry.register(route.clone());
cap.apply(cap_announcement(0x42, 0x100, 1, vec!["gpu"]))
.unwrap();
cap.apply(cap_announcement(0x43, 0x100, 1, vec!["gpu"]))
.unwrap();
route.apply(route_announcement(0xAA, 0x99, 50, 1)).unwrap();
let stats = registry.stats();
assert_eq!(stats.len(), 2);
let cap_stats = stats
.iter()
.find(|s| s.kind == CapFold::KIND_ID)
.expect("cap stats present");
assert_eq!(cap_stats.entries, 2);
assert_eq!(cap_stats.channel_prefix, CapFold::CHANNEL_PREFIX);
let route_stats = stats
.iter()
.find(|s| s.kind == RoutingTestFold::KIND_ID)
.expect("route stats present");
assert_eq!(route_stats.entries, 1);
assert_eq!(route_stats.channel_prefix, RoutingTestFold::CHANNEL_PREFIX);
}
#[test]
fn fold_stats_round_trips_through_serde_json() {
let stats = FoldStats {
kind: CapFold::KIND_ID,
channel_prefix: "test:cap:".to_string(),
entries: 12,
applies_inserted: 10,
applies_replaced: 3,
applies_rejected: 1,
applies_total: 14,
expiries: 2,
evictions: 0,
queries: 7,
snapshots_taken: 1,
snapshots_restored: 0,
has_audit_sink: true,
};
let json = serde_json::to_string(&stats).expect("serialize");
let parsed: FoldStats = serde_json::from_str(&json).expect("deserialize");
assert_eq!(parsed, stats);
}
#[test]
fn ring_audit_sink_drops_oldest_when_capacity_exceeded() {
let sink = RingFoldAuditSink::new(3);
for i in 0..5 {
sink.record(super::AuditEvent {
kind: super::AuditKind::Created,
key_repr: format!("{}", i),
detail: None,
});
}
let snap = sink.snapshot();
assert_eq!(snap.len(), 3);
let keys: Vec<&str> = snap.iter().map(|e| e.key_repr.as_str()).collect();
assert_eq!(keys, vec!["2", "3", "4"]);
}
#[test]
fn audit_kind_custom_variant_round_trips_through_sink() {
let sink = RingFoldAuditSink::new(2);
sink.record(super::AuditEvent {
kind: super::AuditKind::Custom("reservation_takeover"),
key_repr: "0xCAFE".into(),
detail: Some("expired holder 0xDEAD".into()),
});
let snap = sink.snapshot();
assert_eq!(snap.len(), 1);
assert_eq!(
snap[0].kind,
super::AuditKind::Custom("reservation_takeover")
);
assert_ne!(snap[0].kind, super::AuditKind::Custom("eviction"));
}
#[test]
fn ring_audit_sink_with_zero_capacity_stores_nothing() {
let sink = RingFoldAuditSink::new(0);
sink.record(super::AuditEvent {
kind: super::AuditKind::Created,
key_repr: "x".into(),
detail: None,
});
assert!(sink.is_empty());
assert_eq!(sink.len(), 0);
assert!(sink.snapshot().is_empty());
}
#[test]
fn fold_channel_router_trait_object_exposes_stats() {
let registry = FoldRegistry::new();
let cap_fold: Arc<Fold<CapFold>> =
Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
let route_fold: Arc<Fold<RoutingTestFold>> =
Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
registry.register(cap_fold.clone());
registry.register(route_fold.clone());
let kp = EntityKeypair::generate();
cap_fold
.apply(cap_announcement(0x42, 0x100, 1, vec!["gpu"]))
.unwrap();
route_fold
.apply(route_announcement(0xAA, 0x99, 50, 1))
.unwrap();
let _ = kp;
let router: Arc<dyn FoldChannelRouter> = Arc::new(registry);
let stats = router.stats();
assert_eq!(stats.len(), 2, "registry router reports both folds");
let cap_stats = stats
.iter()
.find(|s| s.kind == CapFold::KIND_ID)
.expect("cap stats present");
assert_eq!(cap_stats.entries, 1);
let route_stats = stats
.iter()
.find(|s| s.kind == RoutingTestFold::KIND_ID)
.expect("route stats present");
assert_eq!(route_stats.entries, 1);
}
#[test]
fn fold_channel_router_stub_returns_its_own_empty_stats() {
struct StubRouter;
impl FoldChannelRouter for StubRouter {
fn try_route(
&self,
_publisher: &EntityId,
_bytes: &[u8],
) -> Result<ApplyOutcome, DispatchError> {
Ok(ApplyOutcome::Inserted)
}
fn stats(&self) -> Vec<FoldStats> {
Vec::new()
}
}
let stub: Arc<dyn FoldChannelRouter> = Arc::new(StubRouter);
assert!(stub.stats().is_empty());
}
#[test]
fn ring_audit_sink_plugs_into_fold_and_captures_transitions() {
let fold: Fold<AuditingCapFold> = Fold::with_sweep_interval(std::time::Duration::ZERO);
let sink = Arc::new(RingFoldAuditSink::new(4));
fold.set_audit_sink(Some(sink.clone() as Arc<dyn super::FoldAuditSink>));
let kp = EntityKeypair::generate();
for i in 0..5 {
fold.apply(sign_audit_ann(&kp, 0x10 + i, 0x100, 1, 300, vec!["t"]))
.expect("apply");
}
let snap = sink.snapshot();
assert_eq!(snap.len(), 4);
for e in &snap {
assert_eq!(e.kind, super::AuditKind::Created);
}
}