use std::collections::HashMap;
use std::sync::Arc;
use crate::face::{FaceError, FaceWatchEvent, FaceWatchEventKind, FaceWatchStream, ResourceFormat, ResourceRef};
use crate::Cluster;
#[derive(Clone, Debug)]
pub enum RoutingPolicy {
First,
RoundRobin,
NamespacePrefix {
map: HashMap<String, usize>,
default_member: Option<usize>,
},
}
impl RoutingPolicy {
fn pick_for_ref(&self, reference: &ResourceRef, member_count: usize) -> Option<usize> {
match self {
Self::First => (member_count > 0).then_some(0),
Self::RoundRobin => (member_count > 0).then_some(0),
Self::NamespacePrefix {
map,
default_member,
} => match reference.namespace.as_deref() {
Some(ns) => map.get(ns).copied().or(*default_member),
None => *default_member,
},
}
}
fn pick_for_kind_namespace(
&self,
_kind: &str,
namespace: Option<&str>,
_member_count: usize,
) -> Option<usize> {
match self {
Self::First | Self::RoundRobin => None,
Self::NamespacePrefix {
map,
default_member,
} => match namespace {
Some(ns) => map.get(ns).copied().or(*default_member),
None => *default_member,
},
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum FederationError {
#[error("federation requires ≥1 member cluster; got 0")]
Empty,
#[error("routing policy yielded no member for {0:?}")]
NoRoute(ResourceRef),
#[error("routing policy default_member index {0} out of bounds for {1} members")]
BadDefaultIndex(usize, usize),
#[error("member {0} face error: {1}")]
Member(usize, #[source] FaceError),
}
#[must_use = "FederatedFabric carries cluster handles; consume verbs through it"]
pub struct FederatedFabric {
members: Vec<Arc<Cluster>>,
routing: RoutingPolicy,
}
impl FederatedFabric {
pub fn new(members: Vec<Arc<Cluster>>, routing: RoutingPolicy) -> Result<Self, FederationError> {
if members.is_empty() {
return Err(FederationError::Empty);
}
if let RoutingPolicy::NamespacePrefix {
default_member: Some(idx),
..
} = &routing
&& *idx >= members.len()
{
return Err(FederationError::BadDefaultIndex(*idx, members.len()));
}
if let RoutingPolicy::NamespacePrefix { map, .. } = &routing {
for (_ns, idx) in map.iter() {
if *idx >= members.len() {
return Err(FederationError::BadDefaultIndex(*idx, members.len()));
}
}
}
Ok(Self { members, routing })
}
#[must_use]
pub fn members(&self) -> &[Arc<Cluster>] {
&self.members
}
#[must_use]
pub fn routing(&self) -> &RoutingPolicy {
&self.routing
}
pub fn apply(
&self,
reference: &ResourceRef,
format: ResourceFormat,
body: &[u8],
) -> Result<(), FederationError> {
let idx = self
.routing
.pick_for_ref(reference, self.members.len())
.ok_or_else(|| FederationError::NoRoute(reference.clone()))?;
self.members[idx]
.apply(format, body)
.map_err(|e| FederationError::Member(idx, e))
}
pub fn get(
&self,
reference: &ResourceRef,
format: ResourceFormat,
) -> Result<Vec<u8>, FederationError> {
let idx = self
.routing
.pick_for_ref(reference, self.members.len())
.ok_or_else(|| FederationError::NoRoute(reference.clone()))?;
self.members[idx]
.get(reference, format)
.map_err(|e| FederationError::Member(idx, e))
}
pub fn list(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FederationError> {
if let Some(idx) = self.routing.pick_for_kind_namespace(
kind,
namespace,
self.members.len(),
) {
return self.members[idx]
.list(kind, namespace, format)
.map_err(|e| FederationError::Member(idx, e));
}
let mut out: Vec<Vec<u8>> = Vec::new();
let mut first_err: Option<(usize, FaceError)> = None;
for (idx, member) in self.members.iter().enumerate() {
match member.list(kind, namespace, format) {
Ok(entries) => out.extend(entries),
Err(e) => {
if first_err.is_none() {
first_err = Some((idx, e));
}
}
}
}
if out.is_empty()
&& let Some((idx, e)) = first_err
{
return Err(FederationError::Member(idx, e));
}
Ok(out)
}
pub fn delete(&self, reference: &ResourceRef) -> Result<(), FederationError> {
let idx = self
.routing
.pick_for_ref(reference, self.members.len())
.ok_or_else(|| FederationError::NoRoute(reference.clone()))?;
self.members[idx]
.delete(reference)
.map_err(|e| FederationError::Member(idx, e))
}
pub fn watch(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FederationError> {
if let Some(idx) = self.routing.pick_for_kind_namespace(
kind,
namespace,
self.members.len(),
) {
return self.members[idx]
.watch(kind, namespace, format)
.map_err(|e| FederationError::Member(idx, e));
}
let (tx, rx) = std::sync::mpsc::channel();
let mut started = 0usize;
let mut first_err: Option<(usize, FaceError)> = None;
for (idx, member) in self.members.iter().enumerate() {
match member.watch(kind, namespace, format) {
Ok(mut stream) => {
let tx_clone = tx.clone();
std::thread::spawn(move || {
loop {
match stream.next_event() {
Ok(Some(ev)) => {
if tx_clone.send(ev).is_err() {
return; }
}
Ok(None) | Err(_) => return,
}
}
});
started += 1;
}
Err(e) => {
if first_err.is_none() {
first_err = Some((idx, e));
}
}
}
}
drop(tx); if started == 0
&& let Some((idx, e)) = first_err
{
return Err(FederationError::Member(idx, e));
}
Ok(Box::new(FederatedWatchStream { rx }))
}
}
struct FederatedWatchStream {
rx: std::sync::mpsc::Receiver<FaceWatchEvent>,
}
impl FaceWatchStream for FederatedWatchStream {
fn next_event(&mut self) -> Result<Option<FaceWatchEvent>, FaceError> {
match self.rx.recv() {
Ok(ev) => Ok(Some(ev)),
Err(_) => Ok(None),
}
}
}
pub use crate::face::FaceWatchEventKind as FederatedEventKind;
impl std::fmt::Debug for FederatedFabric {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FederatedFabric")
.field("member_count", &self.members.len())
.field("routing", &self.routing)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::face::encode_native_envelope;
use crate::fabric::{FabricStrategy, FaceKind, FabricFace};
use crate::topology::Quorum3M;
use crate::Cluster;
fn cluster(name: &str) -> Arc<Cluster> {
Arc::new(
Cluster::builder()
.strategy(FabricStrategy::prescribed_homelab())
.face(FabricFace {
name: name.into(),
kind: FaceKind::PureRaft,
})
.topology(Quorum3M)
.start()
.expect("cluster start"),
)
}
fn pod_ref(name: &str, ns: &str) -> ResourceRef {
ResourceRef::namespaced("Pod", name, ns)
}
fn envelope(reference: &ResourceRef, payload: &[u8]) -> Vec<u8> {
encode_native_envelope(reference, payload).unwrap()
}
#[test]
fn federation_requires_at_least_one_member() {
let err = FederatedFabric::new(vec![], RoutingPolicy::First).unwrap_err();
assert!(matches!(err, FederationError::Empty));
}
#[test]
fn federation_validates_default_member_index() {
let c = cluster("a");
let policy = RoutingPolicy::NamespacePrefix {
map: HashMap::new(),
default_member: Some(5),
};
let err = FederatedFabric::new(vec![c], policy).unwrap_err();
match err {
FederationError::BadDefaultIndex(idx, n) => {
assert_eq!(idx, 5);
assert_eq!(n, 1);
}
other => panic!("expected BadDefaultIndex, got {other:?}"),
}
}
#[test]
fn federation_validates_namespace_map_indices() {
let c = cluster("a");
let mut map = HashMap::new();
map.insert("ns".to_string(), 9);
let policy = RoutingPolicy::NamespacePrefix {
map,
default_member: None,
};
let err = FederatedFabric::new(vec![c], policy).unwrap_err();
assert!(matches!(err, FederationError::BadDefaultIndex(_, _)));
}
#[test]
fn federation_constructs_with_valid_members_and_policy() {
let members = vec![cluster("a"), cluster("b")];
let mut map = HashMap::new();
map.insert("ns-a".to_string(), 0);
map.insert("ns-b".to_string(), 1);
let policy = RoutingPolicy::NamespacePrefix {
map,
default_member: Some(0),
};
let fab = FederatedFabric::new(members, policy).unwrap();
assert_eq!(fab.members().len(), 2);
}
#[test]
fn first_policy_routes_all_writes_to_member_zero() {
let fab = FederatedFabric::new(
vec![cluster("a"), cluster("b")],
RoutingPolicy::First,
)
.unwrap();
let r = pod_ref("nginx", "default");
let env = envelope(&r, b"x");
fab.apply(&r, ResourceFormat::Native, &env).unwrap();
assert_eq!(
fab.members()[0]
.get(&r, ResourceFormat::Native)
.unwrap(),
env
);
assert!(fab.members()[1]
.get(&r, ResourceFormat::Native)
.is_err());
}
#[test]
fn namespace_policy_routes_writes_by_namespace() {
let mut map = HashMap::new();
map.insert("team-a".to_string(), 0);
map.insert("team-b".to_string(), 1);
let policy = RoutingPolicy::NamespacePrefix {
map,
default_member: None,
};
let fab = FederatedFabric::new(
vec![cluster("a"), cluster("b")],
policy,
)
.unwrap();
let ra = pod_ref("pod-a", "team-a");
let rb = pod_ref("pod-b", "team-b");
let env_a = envelope(&ra, b"A");
let env_b = envelope(&rb, b"B");
fab.apply(&ra, ResourceFormat::Native, &env_a).unwrap();
fab.apply(&rb, ResourceFormat::Native, &env_b).unwrap();
assert_eq!(
fab.members()[0]
.get(&ra, ResourceFormat::Native)
.unwrap(),
env_a
);
assert_eq!(
fab.members()[1]
.get(&rb, ResourceFormat::Native)
.unwrap(),
env_b
);
assert!(fab.members()[0]
.get(&rb, ResourceFormat::Native)
.is_err());
}
#[test]
fn namespace_policy_errors_on_unrouted_namespace_without_default() {
let policy = RoutingPolicy::NamespacePrefix {
map: HashMap::new(),
default_member: None,
};
let fab = FederatedFabric::new(vec![cluster("a")], policy).unwrap();
let r = pod_ref("nginx", "wild");
match fab.apply(&r, ResourceFormat::Native, &envelope(&r, b"x")) {
Err(FederationError::NoRoute(ref_clone)) => {
assert_eq!(ref_clone, r);
}
other => panic!("expected NoRoute, got {other:?}"),
}
}
#[test]
fn namespace_policy_falls_through_to_default_member() {
let policy = RoutingPolicy::NamespacePrefix {
map: HashMap::new(),
default_member: Some(0),
};
let fab = FederatedFabric::new(vec![cluster("a")], policy).unwrap();
let r = pod_ref("nginx", "wild");
let env = envelope(&r, b"x");
fab.apply(&r, ResourceFormat::Native, &env).unwrap();
assert_eq!(
fab.members()[0]
.get(&r, ResourceFormat::Native)
.unwrap(),
env
);
}
#[test]
fn list_aggregates_across_all_members_for_non_namespace_policy() {
let fab = FederatedFabric::new(
vec![cluster("a"), cluster("b")],
RoutingPolicy::First,
)
.unwrap();
let ra = pod_ref("pod-a", "default");
let rb = pod_ref("pod-b", "default");
let env_a = envelope(&ra, b"A");
let env_b = envelope(&rb, b"B");
fab.members()[0]
.apply(ResourceFormat::Native, &env_a)
.unwrap();
fab.members()[1]
.apply(ResourceFormat::Native, &env_b)
.unwrap();
let all = fab
.list("Pod", Some("default"), ResourceFormat::Native)
.unwrap();
assert_eq!(all.len(), 2);
let mut sorted: Vec<Vec<u8>> = all;
sorted.sort();
let mut want = vec![env_a, env_b];
want.sort();
assert_eq!(sorted, want);
}
#[test]
fn list_routes_to_single_member_under_namespace_policy() {
let mut map = HashMap::new();
map.insert("team-a".to_string(), 0);
map.insert("team-b".to_string(), 1);
let policy = RoutingPolicy::NamespacePrefix {
map,
default_member: None,
};
let fab = FederatedFabric::new(
vec![cluster("a"), cluster("b")],
policy,
)
.unwrap();
let rb = pod_ref("pod-b", "team-b");
let env_b = envelope(&rb, b"B");
fab.members()[1]
.apply(ResourceFormat::Native, &env_b)
.unwrap();
let listed = fab
.list("Pod", Some("team-b"), ResourceFormat::Native)
.unwrap();
assert_eq!(listed, vec![env_b]);
}
#[test]
fn delete_routes_through_same_policy_as_apply() {
let mut map = HashMap::new();
map.insert("team-a".to_string(), 0);
let policy = RoutingPolicy::NamespacePrefix {
map,
default_member: None,
};
let fab = FederatedFabric::new(
vec![cluster("a"), cluster("b")],
policy,
)
.unwrap();
let r = pod_ref("pod-a", "team-a");
fab.apply(&r, ResourceFormat::Native, &envelope(&r, b"x"))
.unwrap();
fab.delete(&r).unwrap();
assert!(fab.members()[0]
.get(&r, ResourceFormat::Native)
.is_err());
}
#[test]
fn watch_fans_events_across_all_members() {
let fab = FederatedFabric::new(
vec![cluster("a"), cluster("b")],
RoutingPolicy::First,
)
.unwrap();
let mut watch = fab
.watch("Pod", Some("default"), ResourceFormat::Native)
.unwrap();
let ra = pod_ref("pod-a", "default");
let rb = pod_ref("pod-b", "default");
let env_a = envelope(&ra, b"A");
let env_b = envelope(&rb, b"B");
fab.members()[0]
.apply(ResourceFormat::Native, &env_a)
.unwrap();
fab.members()[1]
.apply(ResourceFormat::Native, &env_b)
.unwrap();
let mut got: Vec<Vec<u8>> = Vec::new();
for _ in 0..2 {
let ev = watch.next_event().unwrap().expect("event");
got.push(ev.body);
}
got.sort();
let mut want = vec![env_a, env_b];
want.sort();
assert_eq!(got, want);
}
#[test]
fn watch_routes_to_single_member_under_namespace_policy() {
let mut map = HashMap::new();
map.insert("team-a".to_string(), 0);
map.insert("team-b".to_string(), 1);
let policy = RoutingPolicy::NamespacePrefix {
map,
default_member: None,
};
let fab = FederatedFabric::new(
vec![cluster("a"), cluster("b")],
policy,
)
.unwrap();
let mut watch = fab
.watch("Pod", Some("team-a"), ResourceFormat::Native)
.unwrap();
let ra = pod_ref("only-a", "team-a");
let rb = pod_ref("not-here", "team-b");
let env_a = envelope(&ra, b"A");
let env_b = envelope(&rb, b"B");
fab.apply(&ra, ResourceFormat::Native, &env_a).unwrap();
fab.apply(&rb, ResourceFormat::Native, &env_b).unwrap();
let ev = watch.next_event().unwrap().expect("a event");
assert_eq!(ev.body, env_a);
}
#[test]
fn member_error_is_tagged_with_member_index() {
let fab = FederatedFabric::new(
vec![cluster("a")],
RoutingPolicy::First,
)
.unwrap();
let r = pod_ref("missing", "default");
match fab.get(&r, ResourceFormat::Native) {
Err(FederationError::Member(idx, FaceError::Unsupported(_))) => {
assert_eq!(idx, 0);
}
other => panic!("expected Member(0, Unsupported), got {other:?}"),
}
}
}