#![allow(missing_docs)]
use std::collections::{BTreeSet, HashSet};
use std::sync::Mutex;
use soma_som_core::extension::{AroundRing, Extension};
use soma_som_core::federation::{
DescriptorEnvelope, DescriptorPayload, OrientationSummary, RingFingerprint, federation_ns,
};
use soma_som_core::quad::Tree;
pub struct FederationBridge {
pending: Mutex<Vec<DescriptorEnvelope>>,
seen: Mutex<HashSet<(RingFingerprint, u64)>>,
known_rings: Mutex<BTreeSet<RingFingerprint>>,
local_descriptor: Mutex<Option<LocalDescriptor>>,
}
#[derive(Debug, Clone)]
pub struct LocalDescriptor {
pub cycle_index: u64,
pub ou_snapshot: Tree,
pub su_snapshot: Tree,
}
impl FederationBridge {
pub fn new() -> Self {
FederationBridge {
pending: Mutex::new(Vec::new()),
seen: Mutex::new(HashSet::new()),
known_rings: Mutex::new(BTreeSet::new()),
local_descriptor: Mutex::new(None),
}
}
pub fn push_envelope(&self, envelope: DescriptorEnvelope) {
let mut pending = self.pending.lock().unwrap_or_else(|e| e.into_inner());
pending.push(envelope);
}
pub fn known_rings(&self) -> BTreeSet<RingFingerprint> {
self.known_rings
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone()
}
pub fn local_descriptor(&self) -> Option<LocalDescriptor> {
self.local_descriptor
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone()
}
pub fn pending_count(&self) -> usize {
self.pending
.lock()
.unwrap_or_else(|e| e.into_inner())
.len()
}
fn hex_fingerprint(fp: &RingFingerprint) -> String {
fp.iter().map(|b| format!("{b:02x}")).collect()
}
fn extract_health(orientation: &OrientationSummary, cycle: u64) -> Vec<u8> {
let health = serde_json::json!({
"ring_type": orientation.ring_type,
"organ_count": orientation.organs.len(),
"sibling_count": orientation.siblings.len(),
"source_cycle": cycle,
"last_cycle_ns": orientation.last_cycle_ns,
});
serde_json::to_vec(&health).unwrap_or_default()
}
}
impl Default for FederationBridge {
fn default() -> Self {
Self::new()
}
}
impl Extension for FederationBridge {
fn name(&self) -> &str {
"FederationBridge"
}
}
impl AroundRing for FederationBridge {
fn inject(&self, fu_data: &mut Tree) {
let envelopes: Vec<DescriptorEnvelope> = {
let mut pending = self.pending.lock().unwrap_or_else(|e| e.into_inner());
std::mem::take(&mut *pending)
};
if envelopes.is_empty() {
return;
}
let mut seen = self.seen.lock().unwrap_or_else(|e| e.into_inner());
let mut known_rings = self.known_rings.lock().unwrap_or_else(|e| e.into_inner());
for envelope in envelopes {
let dedup_key = (envelope.source_fingerprint, envelope.source_cycle);
if seen.contains(&dedup_key) {
continue;
}
seen.insert(dedup_key);
let hex_fp = Self::hex_fingerprint(&envelope.source_fingerprint);
known_rings.insert(envelope.source_fingerprint);
let descriptor_key =
format!("{}.{hex_fp}", federation_ns::DESCRIPTOR);
let payload_bytes = match &envelope.payload {
DescriptorPayload::Descriptor(data) => data.clone(),
DescriptorPayload::Command { namespace, tree } => {
serde_json::to_vec(&serde_json::json!({
"type": "command",
"namespace": namespace,
"tree": tree,
}))
.unwrap_or_default()
}
DescriptorPayload::Orientation(o) => {
serde_json::to_vec(&serde_json::json!({
"type": "orientation",
"ring_type": o.ring_type,
"organs": o.organs,
"siblings": o.siblings,
"command_namespaces": o.command_namespaces,
"last_cycle_ns": o.last_cycle_ns,
}))
.unwrap_or_default()
}
_ => Vec::new(),
};
fu_data.insert(descriptor_key, payload_bytes);
if let DescriptorPayload::Orientation(ref orientation) = envelope.payload {
let health_key =
format!("{}.{hex_fp}", federation_ns::HEALTH);
let health_bytes =
Self::extract_health(orientation, envelope.source_cycle);
fu_data.insert(health_key, health_bytes);
}
}
let topology: Vec<String> = known_rings
.iter()
.map(Self::hex_fingerprint)
.collect();
let topology_bytes = serde_json::to_vec(&topology).unwrap_or_default();
fu_data.insert(federation_ns::TOPOLOGY.to_string(), topology_bytes);
}
fn observe(&self, cycle_index: u64, ou_output: &Tree, su_output: &Tree) {
let descriptor = LocalDescriptor {
cycle_index,
ou_snapshot: ou_output.clone(),
su_snapshot: su_output.clone(),
};
let mut local = self.local_descriptor.lock().unwrap_or_else(|e| e.into_inner());
*local = Some(descriptor);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn fp_a() -> RingFingerprint {
[0xAAu8; 32]
}
fn fp_b() -> RingFingerprint {
[0xBBu8; 32]
}
fn fp_c() -> RingFingerprint {
[0xCCu8; 32]
}
fn make_descriptor_envelope(fp: RingFingerprint, cycle: u64) -> DescriptorEnvelope {
DescriptorEnvelope {
source_fingerprint: fp,
destination: soma_som_core::federation::EnvelopeDestination::Broadcast,
payload: DescriptorPayload::Descriptor(vec![1, 2, 3]),
source_cycle: cycle,
timestamp_ns: cycle * 1000,
signature: [0u8; 64],
}
}
fn make_orientation_envelope(fp: RingFingerprint, cycle: u64) -> DescriptorEnvelope {
DescriptorEnvelope {
source_fingerprint: fp,
destination: soma_som_core::federation::EnvelopeDestination::Broadcast,
payload: DescriptorPayload::Orientation(OrientationSummary {
ring_type: "mother".into(),
organs: vec!["DIRECTOR".into(), "GUARD".into()],
siblings: vec!["GIT".into()],
command_namespaces: vec!["director".into()],
last_cycle_ns: 500_000,
}),
source_cycle: cycle,
timestamp_ns: cycle * 1000,
signature: [0u8; 64],
}
}
fn make_command_envelope(fp: RingFingerprint, cycle: u64) -> DescriptorEnvelope {
DescriptorEnvelope {
source_fingerprint: fp,
destination: soma_som_core::federation::EnvelopeDestination::Broadcast,
payload: DescriptorPayload::Command {
namespace: "guard.policy.list".into(),
tree: vec![0xCA, 0xFE],
},
source_cycle: cycle,
timestamp_ns: cycle * 1000,
signature: [0u8; 64],
}
}
#[test]
fn inject_descriptor_envelope_into_fu_data() {
let bridge = FederationBridge::new();
bridge.push_envelope(make_descriptor_envelope(fp_a(), 5));
let mut fu_data = Tree::new();
bridge.inject(&mut fu_data);
let hex = FederationBridge::hex_fingerprint(&fp_a());
let key = format!("{}.{hex}", federation_ns::DESCRIPTOR);
assert!(fu_data.contains_key(&key));
assert_eq!(fu_data[&key], vec![1, 2, 3]);
}
#[test]
fn inject_orientation_produces_health_key() {
let bridge = FederationBridge::new();
bridge.push_envelope(make_orientation_envelope(fp_a(), 10));
let mut fu_data = Tree::new();
bridge.inject(&mut fu_data);
let hex = FederationBridge::hex_fingerprint(&fp_a());
let descriptor_key = format!("{}.{hex}", federation_ns::DESCRIPTOR);
let health_key = format!("{}.{hex}", federation_ns::HEALTH);
assert!(fu_data.contains_key(&descriptor_key));
assert!(fu_data.contains_key(&health_key));
let health: serde_json::Value =
serde_json::from_slice(&fu_data[&health_key]).unwrap();
assert_eq!(health["ring_type"], "mother");
assert_eq!(health["organ_count"], 2);
assert_eq!(health["source_cycle"], 10);
}
#[test]
fn inject_updates_topology() {
let bridge = FederationBridge::new();
bridge.push_envelope(make_descriptor_envelope(fp_a(), 1));
bridge.push_envelope(make_descriptor_envelope(fp_b(), 1));
let mut fu_data = Tree::new();
bridge.inject(&mut fu_data);
let topology_bytes = fu_data.get(federation_ns::TOPOLOGY).unwrap();
let topology: Vec<String> = serde_json::from_slice(topology_bytes).unwrap();
assert_eq!(topology.len(), 2);
assert!(topology.contains(&FederationBridge::hex_fingerprint(&fp_a())));
assert!(topology.contains(&FederationBridge::hex_fingerprint(&fp_b())));
}
#[test]
fn duplicate_envelope_not_reinjected() {
let bridge = FederationBridge::new();
bridge.push_envelope(make_descriptor_envelope(fp_a(), 5));
let mut fu_data = Tree::new();
bridge.inject(&mut fu_data);
bridge.push_envelope(make_descriptor_envelope(fp_a(), 5));
let mut fu_data2 = Tree::new();
bridge.inject(&mut fu_data2);
assert_eq!(fu_data2.len(), 1); assert!(fu_data2.contains_key(federation_ns::TOPOLOGY));
}
#[test]
fn empty_pending_no_injection() {
let bridge = FederationBridge::new();
let mut fu_data = Tree::new();
bridge.inject(&mut fu_data);
assert!(fu_data.is_empty());
}
#[test]
fn multiple_peers_inject_independently() {
let bridge = FederationBridge::new();
bridge.push_envelope(make_descriptor_envelope(fp_a(), 1));
bridge.push_envelope(make_descriptor_envelope(fp_b(), 2));
bridge.push_envelope(make_descriptor_envelope(fp_c(), 3));
let mut fu_data = Tree::new();
bridge.inject(&mut fu_data);
let hex_a = FederationBridge::hex_fingerprint(&fp_a());
let hex_b = FederationBridge::hex_fingerprint(&fp_b());
let hex_c = FederationBridge::hex_fingerprint(&fp_c());
assert!(fu_data.contains_key(&format!("{}.{hex_a}", federation_ns::DESCRIPTOR)));
assert!(fu_data.contains_key(&format!("{}.{hex_b}", federation_ns::DESCRIPTOR)));
assert!(fu_data.contains_key(&format!("{}.{hex_c}", federation_ns::DESCRIPTOR)));
assert_eq!(fu_data.len(), 4);
}
#[test]
fn observe_captures_local_descriptor() {
let bridge = FederationBridge::new();
assert!(bridge.local_descriptor().is_none());
let mut ou = Tree::new();
ou.insert("guard.status".into(), b"active".to_vec());
let mut su = Tree::new();
su.insert("orientation".into(), b"mother".to_vec());
bridge.observe(42, &ou, &su);
let desc = bridge.local_descriptor().unwrap();
assert_eq!(desc.cycle_index, 42);
assert_eq!(desc.ou_snapshot["guard.status"], b"active");
assert_eq!(desc.su_snapshot["orientation"], b"mother");
}
#[test]
fn descriptor_payload_no_health_key() {
let bridge = FederationBridge::new();
bridge.push_envelope(make_descriptor_envelope(fp_a(), 1));
let mut fu_data = Tree::new();
bridge.inject(&mut fu_data);
let hex = FederationBridge::hex_fingerprint(&fp_a());
let health_key = format!("{}.{hex}", federation_ns::HEALTH);
assert!(
!fu_data.contains_key(&health_key),
"Descriptor payload should not produce health key"
);
}
#[test]
fn command_payload_injection() {
let bridge = FederationBridge::new();
bridge.push_envelope(make_command_envelope(fp_a(), 1));
let mut fu_data = Tree::new();
bridge.inject(&mut fu_data);
let hex = FederationBridge::hex_fingerprint(&fp_a());
let key = format!("{}.{hex}", federation_ns::DESCRIPTOR);
let value: serde_json::Value =
serde_json::from_slice(&fu_data[&key]).unwrap();
assert_eq!(value["type"], "command");
assert_eq!(value["namespace"], "guard.policy.list");
}
#[test]
fn extension_trait_compliance() {
let bridge = FederationBridge::new();
assert_eq!(bridge.name(), "FederationBridge"); }
#[test]
fn same_fingerprint_different_cycles_both_injected() {
let bridge = FederationBridge::new();
bridge.push_envelope(make_descriptor_envelope(fp_a(), 1));
bridge.push_envelope(make_descriptor_envelope(fp_a(), 2));
let mut fu_data = Tree::new();
bridge.inject(&mut fu_data);
let hex = FederationBridge::hex_fingerprint(&fp_a());
let key = format!("{}.{hex}", federation_ns::DESCRIPTOR);
assert!(fu_data.contains_key(&key));
let topology: Vec<String> =
serde_json::from_slice(fu_data.get(federation_ns::TOPOLOGY).unwrap()).unwrap();
assert_eq!(topology.len(), 1);
}
#[test]
fn known_rings_accumulates() {
let bridge = FederationBridge::new();
assert!(bridge.known_rings().is_empty());
bridge.push_envelope(make_descriptor_envelope(fp_a(), 1));
let mut fu_data = Tree::new();
bridge.inject(&mut fu_data);
assert_eq!(bridge.known_rings().len(), 1);
bridge.push_envelope(make_descriptor_envelope(fp_b(), 1));
bridge.inject(&mut fu_data);
assert_eq!(bridge.known_rings().len(), 2);
}
#[test]
fn pending_count_tracks_queue() {
let bridge = FederationBridge::new();
assert_eq!(bridge.pending_count(), 0);
bridge.push_envelope(make_descriptor_envelope(fp_a(), 1));
bridge.push_envelope(make_descriptor_envelope(fp_b(), 1));
assert_eq!(bridge.pending_count(), 2);
let mut fu_data = Tree::new();
bridge.inject(&mut fu_data);
assert_eq!(bridge.pending_count(), 0);
}
#[test]
fn orientation_produces_descriptor_and_health() {
let bridge = FederationBridge::new();
bridge.push_envelope(make_orientation_envelope(fp_b(), 7));
let mut fu_data = Tree::new();
bridge.inject(&mut fu_data);
let hex = FederationBridge::hex_fingerprint(&fp_b());
assert!(fu_data.contains_key(&format!("{}.{hex}", federation_ns::DESCRIPTOR)));
assert!(fu_data.contains_key(&format!("{}.{hex}", federation_ns::HEALTH)));
assert!(fu_data.contains_key(federation_ns::TOPOLOGY));
assert_eq!(fu_data.len(), 3);
}
#[test]
fn observe_overwrites_previous() {
let bridge = FederationBridge::new();
let ou1 = Tree::new();
let su1 = Tree::new();
bridge.observe(1, &ou1, &su1);
let mut ou2 = Tree::new();
ou2.insert("new_key".into(), b"value".to_vec());
bridge.observe(2, &ou2, &Tree::new());
let desc = bridge.local_descriptor().unwrap();
assert_eq!(desc.cycle_index, 2);
assert!(desc.ou_snapshot.contains_key("new_key"));
}
}