use std::sync::Mutex;
use crate::fabric::{FabricFace, FaceKind};
#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
pub enum FaceError {
#[error("face already started")]
AlreadyStarted,
#[error("face not started")]
NotStarted,
#[error("face start failed: {0}")]
StartFailed(String),
#[error("face shutdown failed: {0}")]
ShutdownFailed(String),
#[error("face does not support operation: {0}")]
Unsupported(String),
}
pub trait Face: Send + Sync + 'static {
fn name(&self) -> &str;
fn kind(&self) -> FaceKind;
fn start(&self) -> Result<(), FaceError>;
fn shutdown(&self) -> Result<(), FaceError>;
fn is_running(&self) -> bool;
fn resource_count(&self) -> usize {
0
}
fn subscriber_count(&self) -> usize {
0
}
fn snapshot(&self) -> Result<Vec<u8>, FaceError> {
Err(FaceError::Unsupported(format!(
"snapshot not supported by {}",
self.name()
)))
}
fn restore(&self, _snapshot_bytes: &[u8]) -> Result<(), FaceError> {
Err(FaceError::Unsupported(format!(
"restore not supported by {}",
self.name()
)))
}
fn apply_resource(&self, _format: ResourceFormat, _body: &[u8]) -> Result<(), FaceError> {
Err(FaceError::Unsupported(format!(
"apply_resource not yet implemented for {}",
self.name()
)))
}
fn get_resource(
&self,
_reference: &ResourceRef,
_format: ResourceFormat,
) -> Result<Vec<u8>, FaceError> {
Err(FaceError::Unsupported(format!(
"get_resource not yet implemented for {}",
self.name()
)))
}
fn list_resources(
&self,
_kind: &str,
_namespace: Option<&str>,
_format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FaceError> {
Err(FaceError::Unsupported(format!(
"list_resources not yet implemented for {}",
self.name()
)))
}
fn delete_resource(&self, _reference: &ResourceRef) -> Result<(), FaceError> {
Err(FaceError::Unsupported(format!(
"delete_resource not yet implemented for {}",
self.name()
)))
}
fn watch_resources(
&self,
_kind: &str,
_namespace: Option<&str>,
_format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FaceError> {
Err(FaceError::Unsupported(format!(
"watch_resources not yet implemented for {}",
self.name()
)))
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum ResourceFormat {
Yaml,
Json,
Hcl,
Native,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ResourceRef {
pub kind: String,
pub name: String,
pub namespace: Option<String>,
}
impl ResourceRef {
#[must_use]
pub fn cluster_scoped(kind: impl Into<String>, name: impl Into<String>) -> Self {
Self {
kind: kind.into(),
name: name.into(),
namespace: None,
}
}
#[must_use]
pub fn namespaced(
kind: impl Into<String>,
name: impl Into<String>,
namespace: impl Into<String>,
) -> Self {
Self {
kind: kind.into(),
name: name.into(),
namespace: Some(namespace.into()),
}
}
}
pub trait FaceWatchStream: Send + 'static {
fn next_event(&mut self) -> Result<Option<FaceWatchEvent>, FaceError>;
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FaceWatchEvent {
pub kind: FaceWatchEventKind,
pub body: Vec<u8>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum FaceWatchEventKind {
Added,
Modified,
Deleted,
Reset,
}
pub struct PureRaftFace {
name: String,
state: Mutex<FaceState>,
store: crate::face_store::InMemoryStore,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum FaceState {
Stopped,
Running,
}
impl PureRaftFace {
#[must_use]
pub fn from_declaration(decl: &FabricFace) -> Option<Self> {
if decl.kind != FaceKind::PureRaft {
return None;
}
Some(Self {
name: decl.name.clone(),
state: Mutex::new(FaceState::Stopped),
store: crate::face_store::InMemoryStore::new(decl.name.clone()),
})
}
#[must_use]
pub fn with_adapters(mut self, adapters: crate::format::AdapterRegistry) -> Self {
self.store.set_adapters(adapters);
self
}
}
impl Face for PureRaftFace {
fn name(&self) -> &str {
&self.name
}
fn kind(&self) -> FaceKind {
FaceKind::PureRaft
}
fn start(&self) -> Result<(), FaceError> {
let mut state = self.state.lock().expect("face state mutex poisoned");
if *state == FaceState::Running {
return Err(FaceError::AlreadyStarted);
}
*state = FaceState::Running;
Ok(())
}
fn shutdown(&self) -> Result<(), FaceError> {
let mut state = self.state.lock().expect("face state mutex poisoned");
if *state == FaceState::Stopped {
return Err(FaceError::NotStarted);
}
*state = FaceState::Stopped;
Ok(())
}
fn is_running(&self) -> bool {
let state = self.state.lock().expect("face state mutex poisoned");
*state == FaceState::Running
}
fn apply_resource(&self, format: ResourceFormat, body: &[u8]) -> Result<(), FaceError> {
self.store.apply(format, body)
}
fn get_resource(
&self,
reference: &ResourceRef,
format: ResourceFormat,
) -> Result<Vec<u8>, FaceError> {
self.store.get(reference, format)
}
fn list_resources(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FaceError> {
self.store.list(kind, namespace, format)
}
fn delete_resource(&self, reference: &ResourceRef) -> Result<(), FaceError> {
self.store.delete(reference)
}
fn watch_resources(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FaceError> {
self.store.watch(kind, namespace, format)
}
fn resource_count(&self) -> usize {
self.store.len()
}
fn subscriber_count(&self) -> usize {
self.store.subscriber_count()
}
fn snapshot(&self) -> Result<Vec<u8>, FaceError> {
self.store.snapshot()
}
fn restore(&self, snapshot_bytes: &[u8]) -> Result<(), FaceError> {
self.store.restore(snapshot_bytes)
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct NativeEnvelope {
#[serde(rename = "ref")]
reference: ResourceRef,
payload: Vec<u8>,
}
impl serde::Serialize for ResourceRef {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
use serde::ser::SerializeStruct;
let mut state = s.serialize_struct("ResourceRef", 3)?;
state.serialize_field("kind", &self.kind)?;
state.serialize_field("name", &self.name)?;
state.serialize_field("namespace", &self.namespace)?;
state.end()
}
}
impl<'de> serde::Deserialize<'de> for ResourceRef {
fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
#[derive(serde::Deserialize)]
struct Wire {
kind: String,
name: String,
namespace: Option<String>,
}
let w = Wire::deserialize(d)?;
Ok(Self {
kind: w.kind,
name: w.name,
namespace: w.namespace,
})
}
}
struct MpscWatchStream {
rx: std::sync::mpsc::Receiver<FaceWatchEvent>,
}
impl FaceWatchStream for MpscWatchStream {
fn next_event(&mut self) -> Result<Option<FaceWatchEvent>, FaceError> {
match self.rx.recv() {
Ok(event) => Ok(Some(event)),
Err(_) => Ok(None),
}
}
}
pub fn encode_native_envelope(reference: &ResourceRef, payload: &[u8]) -> Result<Vec<u8>, String> {
let env = NativeEnvelope {
reference: reference.clone(),
payload: payload.to_vec(),
};
let mut out = Vec::new();
ciborium::into_writer(&env, &mut out).map_err(|e| e.to_string())?;
Ok(out)
}
pub struct KubernetesFace {
name: String,
version: String,
certified_cncf: bool,
state: Mutex<FaceState>,
store: crate::face_store::InMemoryStore,
}
impl KubernetesFace {
#[must_use]
pub fn from_declaration(decl: &FabricFace) -> Option<Self> {
let (version, certified_cncf) = match &decl.kind {
FaceKind::Kubernetes {
version,
certified_cncf,
} => (version.clone(), *certified_cncf),
_ => return None,
};
Some(Self {
name: decl.name.clone(),
version,
certified_cncf,
state: Mutex::new(FaceState::Stopped),
store: crate::face_store::InMemoryStore::new(decl.name.clone()),
})
}
#[must_use]
pub fn version(&self) -> &str {
&self.version
}
#[must_use]
pub fn is_cncf_certified(&self) -> bool {
self.certified_cncf
}
#[must_use]
pub fn with_adapters(mut self, adapters: crate::format::AdapterRegistry) -> Self {
self.store.set_adapters(adapters);
self
}
}
impl Face for KubernetesFace {
fn name(&self) -> &str {
&self.name
}
fn kind(&self) -> FaceKind {
FaceKind::Kubernetes {
version: self.version.clone(),
certified_cncf: self.certified_cncf,
}
}
fn start(&self) -> Result<(), FaceError> {
let mut state = self.state.lock().expect("face state mutex poisoned");
if *state == FaceState::Running {
return Err(FaceError::AlreadyStarted);
}
*state = FaceState::Running;
Ok(())
}
fn shutdown(&self) -> Result<(), FaceError> {
let mut state = self.state.lock().expect("face state mutex poisoned");
if *state == FaceState::Stopped {
return Err(FaceError::NotStarted);
}
*state = FaceState::Stopped;
Ok(())
}
fn is_running(&self) -> bool {
let state = self.state.lock().expect("face state mutex poisoned");
*state == FaceState::Running
}
fn apply_resource(&self, format: ResourceFormat, body: &[u8]) -> Result<(), FaceError> {
self.store.apply(format, body)
}
fn get_resource(
&self,
reference: &ResourceRef,
format: ResourceFormat,
) -> Result<Vec<u8>, FaceError> {
self.store.get(reference, format)
}
fn list_resources(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FaceError> {
self.store.list(kind, namespace, format)
}
fn delete_resource(&self, reference: &ResourceRef) -> Result<(), FaceError> {
self.store.delete(reference)
}
fn watch_resources(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FaceError> {
self.store.watch(kind, namespace, format)
}
fn resource_count(&self) -> usize { self.store.len() }
fn subscriber_count(&self) -> usize { self.store.subscriber_count() }
fn snapshot(&self) -> Result<Vec<u8>, FaceError> { self.store.snapshot() }
fn restore(&self, b: &[u8]) -> Result<(), FaceError> { self.store.restore(b) }
}
pub struct NomadFace {
name: String,
version: String,
state: Mutex<FaceState>,
store: crate::face_store::InMemoryStore,
}
impl NomadFace {
#[must_use]
pub fn from_declaration(decl: &FabricFace) -> Option<Self> {
let version = match &decl.kind {
FaceKind::Nomad { version } => version.clone(),
_ => return None,
};
Some(Self {
name: decl.name.clone(),
version,
state: Mutex::new(FaceState::Stopped),
store: crate::face_store::InMemoryStore::new(decl.name.clone()),
})
}
#[must_use]
pub fn version(&self) -> &str {
&self.version
}
#[must_use]
pub fn with_adapters(mut self, adapters: crate::format::AdapterRegistry) -> Self {
self.store.set_adapters(adapters);
self
}
}
impl Face for NomadFace {
fn name(&self) -> &str {
&self.name
}
fn kind(&self) -> FaceKind {
FaceKind::Nomad {
version: self.version.clone(),
}
}
fn start(&self) -> Result<(), FaceError> {
let mut state = self.state.lock().expect("face state mutex poisoned");
if *state == FaceState::Running {
return Err(FaceError::AlreadyStarted);
}
*state = FaceState::Running;
Ok(())
}
fn shutdown(&self) -> Result<(), FaceError> {
let mut state = self.state.lock().expect("face state mutex poisoned");
if *state == FaceState::Stopped {
return Err(FaceError::NotStarted);
}
*state = FaceState::Stopped;
Ok(())
}
fn is_running(&self) -> bool {
let state = self.state.lock().expect("face state mutex poisoned");
*state == FaceState::Running
}
fn apply_resource(&self, format: ResourceFormat, body: &[u8]) -> Result<(), FaceError> {
self.store.apply(format, body)
}
fn get_resource(
&self,
reference: &ResourceRef,
format: ResourceFormat,
) -> Result<Vec<u8>, FaceError> {
self.store.get(reference, format)
}
fn list_resources(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FaceError> {
self.store.list(kind, namespace, format)
}
fn delete_resource(&self, reference: &ResourceRef) -> Result<(), FaceError> {
self.store.delete(reference)
}
fn watch_resources(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FaceError> {
self.store.watch(kind, namespace, format)
}
fn resource_count(&self) -> usize { self.store.len() }
fn subscriber_count(&self) -> usize { self.store.subscriber_count() }
fn snapshot(&self) -> Result<Vec<u8>, FaceError> { self.store.snapshot() }
fn restore(&self, b: &[u8]) -> Result<(), FaceError> { self.store.restore(b) }
}
pub struct SystemdFace {
name: String,
user_units: bool,
state: Mutex<FaceState>,
store: crate::face_store::InMemoryStore,
}
impl SystemdFace {
#[must_use]
pub fn from_declaration(decl: &FabricFace) -> Option<Self> {
let user_units = match &decl.kind {
FaceKind::Systemd { user_units } => *user_units,
_ => return None,
};
Some(Self {
name: decl.name.clone(),
user_units,
state: Mutex::new(FaceState::Stopped),
store: crate::face_store::InMemoryStore::new(decl.name.clone()),
})
}
#[must_use]
pub fn is_user_units(&self) -> bool {
self.user_units
}
#[must_use]
pub fn with_adapters(mut self, adapters: crate::format::AdapterRegistry) -> Self {
self.store.set_adapters(adapters);
self
}
}
impl Face for SystemdFace {
fn name(&self) -> &str {
&self.name
}
fn kind(&self) -> FaceKind {
FaceKind::Systemd {
user_units: self.user_units,
}
}
fn start(&self) -> Result<(), FaceError> {
let mut state = self.state.lock().expect("face state mutex poisoned");
if *state == FaceState::Running {
return Err(FaceError::AlreadyStarted);
}
*state = FaceState::Running;
Ok(())
}
fn shutdown(&self) -> Result<(), FaceError> {
let mut state = self.state.lock().expect("face state mutex poisoned");
if *state == FaceState::Stopped {
return Err(FaceError::NotStarted);
}
*state = FaceState::Stopped;
Ok(())
}
fn is_running(&self) -> bool {
let state = self.state.lock().expect("face state mutex poisoned");
*state == FaceState::Running
}
fn apply_resource(&self, format: ResourceFormat, body: &[u8]) -> Result<(), FaceError> {
self.store.apply(format, body)
}
fn get_resource(
&self,
reference: &ResourceRef,
format: ResourceFormat,
) -> Result<Vec<u8>, FaceError> {
self.store.get(reference, format)
}
fn list_resources(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FaceError> {
self.store.list(kind, namespace, format)
}
fn delete_resource(&self, reference: &ResourceRef) -> Result<(), FaceError> {
self.store.delete(reference)
}
fn watch_resources(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FaceError> {
self.store.watch(kind, namespace, format)
}
fn resource_count(&self) -> usize { self.store.len() }
fn subscriber_count(&self) -> usize { self.store.subscriber_count() }
fn snapshot(&self) -> Result<Vec<u8>, FaceError> { self.store.snapshot() }
fn restore(&self, b: &[u8]) -> Result<(), FaceError> { self.store.restore(b) }
}
pub struct BareMetalSupervisorFace {
name: String,
state: Mutex<FaceState>,
store: crate::face_store::InMemoryStore,
}
impl BareMetalSupervisorFace {
#[must_use]
pub fn from_declaration(decl: &FabricFace) -> Option<Self> {
if decl.kind != FaceKind::BareMetalSupervisor {
return None;
}
Some(Self {
name: decl.name.clone(),
state: Mutex::new(FaceState::Stopped),
store: crate::face_store::InMemoryStore::new(decl.name.clone()),
})
}
#[must_use]
pub fn with_adapters(mut self, adapters: crate::format::AdapterRegistry) -> Self {
self.store.set_adapters(adapters);
self
}
}
impl Face for BareMetalSupervisorFace {
fn name(&self) -> &str {
&self.name
}
fn kind(&self) -> FaceKind {
FaceKind::BareMetalSupervisor
}
fn start(&self) -> Result<(), FaceError> {
let mut state = self.state.lock().expect("face state mutex poisoned");
if *state == FaceState::Running {
return Err(FaceError::AlreadyStarted);
}
*state = FaceState::Running;
Ok(())
}
fn shutdown(&self) -> Result<(), FaceError> {
let mut state = self.state.lock().expect("face state mutex poisoned");
if *state == FaceState::Stopped {
return Err(FaceError::NotStarted);
}
*state = FaceState::Stopped;
Ok(())
}
fn is_running(&self) -> bool {
let state = self.state.lock().expect("face state mutex poisoned");
*state == FaceState::Running
}
fn apply_resource(&self, format: ResourceFormat, body: &[u8]) -> Result<(), FaceError> {
self.store.apply(format, body)
}
fn get_resource(
&self,
reference: &ResourceRef,
format: ResourceFormat,
) -> Result<Vec<u8>, FaceError> {
self.store.get(reference, format)
}
fn list_resources(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FaceError> {
self.store.list(kind, namespace, format)
}
fn delete_resource(&self, reference: &ResourceRef) -> Result<(), FaceError> {
self.store.delete(reference)
}
fn watch_resources(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FaceError> {
self.store.watch(kind, namespace, format)
}
fn resource_count(&self) -> usize { self.store.len() }
fn subscriber_count(&self) -> usize { self.store.subscriber_count() }
fn snapshot(&self) -> Result<Vec<u8>, FaceError> { self.store.snapshot() }
fn restore(&self, b: &[u8]) -> Result<(), FaceError> { self.store.restore(b) }
}
pub fn instantiate(decl: &FabricFace) -> Result<Box<dyn Face>, FaceError> {
match &decl.kind {
FaceKind::PureRaft => Ok(Box::new(
PureRaftFace::from_declaration(decl)
.expect("kind matched, declaration must construct"),
)),
FaceKind::Kubernetes { .. } => Ok(Box::new(
KubernetesFace::from_declaration(decl)
.expect("kind matched, declaration must construct"),
)),
FaceKind::Nomad { .. } => Ok(Box::new(
NomadFace::from_declaration(decl)
.expect("kind matched, declaration must construct"),
)),
FaceKind::Systemd { .. } => Ok(Box::new(
SystemdFace::from_declaration(decl)
.expect("kind matched, declaration must construct"),
)),
FaceKind::BareMetalSupervisor => Ok(Box::new(
BareMetalSupervisorFace::from_declaration(decl)
.expect("kind matched, declaration must construct"),
)),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn raft_decl() -> FabricFace {
FabricFace {
name: "pure-raft-test".into(),
kind: FaceKind::PureRaft,
}
}
fn k8s_decl() -> FabricFace {
FabricFace::prescribed_kubernetes_v1_34()
}
#[test]
fn pure_raft_constructs_from_matching_declaration() {
let face = PureRaftFace::from_declaration(&raft_decl());
assert!(face.is_some());
}
#[test]
fn pure_raft_rejects_non_matching_declaration() {
let face = PureRaftFace::from_declaration(&k8s_decl());
assert!(face.is_none());
}
#[test]
fn pure_raft_lifecycle_starts_then_stops() {
let face = PureRaftFace::from_declaration(&raft_decl()).unwrap();
assert!(!face.is_running());
assert_eq!(face.start(), Ok(()));
assert!(face.is_running());
assert_eq!(face.shutdown(), Ok(()));
assert!(!face.is_running());
}
#[test]
fn pure_raft_double_start_errors() {
let face = PureRaftFace::from_declaration(&raft_decl()).unwrap();
face.start().unwrap();
assert_eq!(face.start(), Err(FaceError::AlreadyStarted));
}
#[test]
fn pure_raft_shutdown_without_start_errors() {
let face = PureRaftFace::from_declaration(&raft_decl()).unwrap();
assert_eq!(face.shutdown(), Err(FaceError::NotStarted));
}
#[test]
fn kubernetes_constructs_from_matching_declaration() {
let face = KubernetesFace::from_declaration(&k8s_decl()).unwrap();
assert_eq!(face.version(), "1.34");
assert!(face.is_cncf_certified());
}
#[test]
fn kubernetes_rejects_non_matching_declaration() {
let face = KubernetesFace::from_declaration(&raft_decl());
assert!(face.is_none());
}
#[test]
fn kubernetes_lifecycle_starts_then_stops() {
let face = KubernetesFace::from_declaration(&k8s_decl()).unwrap();
assert!(!face.is_running());
face.start().unwrap();
assert!(face.is_running());
face.shutdown().unwrap();
assert!(!face.is_running());
}
#[test]
fn kubernetes_double_start_errors() {
let face = KubernetesFace::from_declaration(&k8s_decl()).unwrap();
face.start().unwrap();
assert_eq!(face.start(), Err(FaceError::AlreadyStarted));
}
#[test]
fn face_trait_is_object_safe_send_sync_static() {
fn assert_send_sync_static<T: Send + Sync + 'static>() {}
assert_send_sync_static::<Box<dyn Face>>();
}
#[test]
fn instantiate_pure_raft_returns_running_face() {
let face = instantiate(&raft_decl()).unwrap();
assert_eq!(face.name(), "pure-raft-test");
assert_eq!(face.kind(), FaceKind::PureRaft);
face.start().unwrap();
assert!(face.is_running());
}
#[test]
fn instantiate_kubernetes_returns_typed_face() {
let face = instantiate(&k8s_decl()).unwrap();
match face.kind() {
FaceKind::Kubernetes {
version,
certified_cncf,
} => {
assert_eq!(version, "1.34");
assert!(certified_cncf);
}
other => panic!("expected Kubernetes face, got {other:?}"),
}
}
fn nomad_decl() -> FabricFace {
FabricFace {
name: "nomad-1.7".into(),
kind: FaceKind::Nomad {
version: "1.7".into(),
},
}
}
#[test]
fn nomad_constructs_from_matching_declaration() {
let face = NomadFace::from_declaration(&nomad_decl()).unwrap();
assert_eq!(face.version(), "1.7");
assert_eq!(face.name(), "nomad-1.7");
}
#[test]
fn nomad_rejects_non_matching_declaration() {
let face = NomadFace::from_declaration(&k8s_decl());
assert!(face.is_none());
let face = NomadFace::from_declaration(&raft_decl());
assert!(face.is_none());
}
#[test]
fn nomad_lifecycle_starts_then_stops() {
let face = NomadFace::from_declaration(&nomad_decl()).unwrap();
assert!(!face.is_running());
face.start().unwrap();
assert!(face.is_running());
face.shutdown().unwrap();
assert!(!face.is_running());
}
#[test]
fn nomad_double_start_errors() {
let face = NomadFace::from_declaration(&nomad_decl()).unwrap();
face.start().unwrap();
assert_eq!(face.start(), Err(FaceError::AlreadyStarted));
}
#[test]
fn instantiate_nomad_returns_running_face() {
match instantiate(&nomad_decl()) {
Ok(face) => {
assert_eq!(face.name(), "nomad-1.7");
match face.kind() {
FaceKind::Nomad { version } => assert_eq!(version, "1.7"),
other => panic!("expected Nomad face, got {other:?}"),
}
}
Err(e) => panic!("Nomad face should construct, got error {e}"),
}
}
fn systemd_decl(user_units: bool) -> FabricFace {
FabricFace {
name: if user_units { "systemd-user" } else { "systemd-system" }.into(),
kind: FaceKind::Systemd { user_units },
}
}
#[test]
fn systemd_constructs_from_matching_declaration() {
let face = SystemdFace::from_declaration(&systemd_decl(true)).unwrap();
assert_eq!(face.name(), "systemd-user");
assert!(face.is_user_units());
}
#[test]
fn systemd_carries_user_vs_system_distinction() {
let user = SystemdFace::from_declaration(&systemd_decl(true)).unwrap();
let system = SystemdFace::from_declaration(&systemd_decl(false)).unwrap();
assert!(user.is_user_units());
assert!(!system.is_user_units());
}
#[test]
fn systemd_rejects_non_matching_declaration() {
assert!(SystemdFace::from_declaration(&k8s_decl()).is_none());
assert!(SystemdFace::from_declaration(&raft_decl()).is_none());
assert!(SystemdFace::from_declaration(&nomad_decl()).is_none());
}
#[test]
fn systemd_lifecycle_starts_then_stops() {
let face = SystemdFace::from_declaration(&systemd_decl(false)).unwrap();
face.start().unwrap();
assert!(face.is_running());
face.shutdown().unwrap();
assert!(!face.is_running());
}
#[test]
fn instantiate_systemd_returns_running_face() {
match instantiate(&systemd_decl(false)) {
Ok(face) => {
assert_eq!(face.name(), "systemd-system");
match face.kind() {
FaceKind::Systemd { user_units } => assert!(!user_units),
other => panic!("expected Systemd face, got {other:?}"),
}
}
Err(e) => panic!("Systemd face should construct, got error {e}"),
}
}
fn bms_decl() -> FabricFace {
FabricFace {
name: "bms-test".into(),
kind: FaceKind::BareMetalSupervisor,
}
}
#[test]
fn bms_constructs_from_matching_declaration() {
let face = BareMetalSupervisorFace::from_declaration(&bms_decl()).unwrap();
assert_eq!(face.name(), "bms-test");
assert_eq!(face.kind(), FaceKind::BareMetalSupervisor);
}
#[test]
fn bms_rejects_non_matching_declaration() {
assert!(BareMetalSupervisorFace::from_declaration(&k8s_decl()).is_none());
assert!(BareMetalSupervisorFace::from_declaration(&raft_decl()).is_none());
assert!(BareMetalSupervisorFace::from_declaration(&nomad_decl()).is_none());
}
#[test]
fn bms_lifecycle_starts_then_stops() {
let face = BareMetalSupervisorFace::from_declaration(&bms_decl()).unwrap();
face.start().unwrap();
assert!(face.is_running());
face.shutdown().unwrap();
assert!(!face.is_running());
}
#[test]
fn instantiate_bare_metal_supervisor_now_returns_running_face() {
match instantiate(&bms_decl()) {
Ok(face) => {
assert_eq!(face.name(), "bms-test");
assert_eq!(face.kind(), FaceKind::BareMetalSupervisor);
}
Err(e) => panic!("BMS face should construct, got error {e}"),
}
}
#[test]
fn instantiate_covers_every_face_kind_with_no_unsupported_arm() {
let kinds = vec![
FaceKind::PureRaft,
FaceKind::Kubernetes {
version: "1.34".into(),
certified_cncf: true,
},
FaceKind::Nomad { version: "1.7".into() },
FaceKind::Systemd { user_units: false },
FaceKind::BareMetalSupervisor,
];
for kind in kinds {
let decl = FabricFace {
name: format!("{kind:?}"),
kind,
};
match instantiate(&decl) {
Ok(_) => {}
Err(e) => panic!("FaceKind {:?} failed to instantiate: {e}", decl.kind),
}
}
}
#[test]
fn five_faces_compose_in_a_single_vec() {
let faces: Vec<Box<dyn Face>> = vec![
Box::new(PureRaftFace::from_declaration(&raft_decl()).unwrap()),
Box::new(KubernetesFace::from_declaration(&k8s_decl()).unwrap()),
Box::new(NomadFace::from_declaration(&nomad_decl()).unwrap()),
Box::new(SystemdFace::from_declaration(&systemd_decl(false)).unwrap()),
Box::new(BareMetalSupervisorFace::from_declaration(&bms_decl()).unwrap()),
];
assert_eq!(faces.len(), 5);
for face in &faces {
assert!(!face.is_running());
}
}
fn yaml_manifest(name: &str, ns: &str) -> Vec<u8> {
format!(
"apiVersion: v1\nkind: Pod\nmetadata:\n name: {name}\n namespace: {ns}\nspec:\n containers:\n - name: c\n image: nginx\n"
)
.into_bytes()
}
#[test]
fn kubernetes_face_apply_get_yaml_round_trips() {
let face = KubernetesFace::from_declaration(&k8s_decl()).unwrap();
let yaml = yaml_manifest("nginx", "default");
face.apply_resource(ResourceFormat::Yaml, &yaml).unwrap();
let r = ResourceRef::namespaced("Pod", "nginx", "default");
let back = face.get_resource(&r, ResourceFormat::Yaml).unwrap();
assert_eq!(back, yaml);
}
#[test]
fn nomad_face_apply_get_json_round_trips() {
let face = NomadFace::from_declaration(&nomad_decl()).unwrap();
let json = serde_json::to_vec(&serde_json::json!({
"apiVersion": "nomad.io/v1",
"kind": "Job",
"metadata": { "name": "web", "namespace": "global" }
}))
.unwrap();
face.apply_resource(ResourceFormat::Json, &json).unwrap();
let r = ResourceRef::namespaced("Job", "web", "global");
let back = face.get_resource(&r, ResourceFormat::Json).unwrap();
assert_eq!(back, json);
}
#[test]
fn systemd_face_list_aggregates_applied_resources() {
let face = SystemdFace::from_declaration(&systemd_decl(false)).unwrap();
let y1 = yaml_manifest("a", "default");
let y2 = yaml_manifest("b", "default");
face.apply_resource(ResourceFormat::Yaml, &y1).unwrap();
face.apply_resource(ResourceFormat::Yaml, &y2).unwrap();
let listed = face
.list_resources("Pod", Some("default"), ResourceFormat::Yaml)
.unwrap();
assert_eq!(listed.len(), 2);
}
#[test]
fn bms_face_watch_streams_events() {
let face = BareMetalSupervisorFace::from_declaration(&bms_decl()).unwrap();
let mut watch = face
.watch_resources("Pod", Some("default"), ResourceFormat::Yaml)
.unwrap();
let yaml = yaml_manifest("nginx", "default");
face.apply_resource(ResourceFormat::Yaml, &yaml).unwrap();
let ev = watch.next_event().unwrap().expect("event");
assert_eq!(ev.kind, FaceWatchEventKind::Added);
}
#[test]
fn delete_resource_missing_errors_uniformly_across_faces() {
let r = ResourceRef::namespaced("Pod", "missing", "default");
let k = KubernetesFace::from_declaration(&k8s_decl()).unwrap();
let n = NomadFace::from_declaration(&nomad_decl()).unwrap();
let s = SystemdFace::from_declaration(&systemd_decl(false)).unwrap();
let b = BareMetalSupervisorFace::from_declaration(&bms_decl()).unwrap();
let p = PureRaftFace::from_declaration(&raft_decl()).unwrap();
for (label, result) in [
("k8s", k.delete_resource(&r)),
("nomad", n.delete_resource(&r)),
("systemd", s.delete_resource(&r)),
("bms", b.delete_resource(&r)),
("pure-raft", p.delete_resource(&r)),
] {
match result {
Err(FaceError::Unsupported(msg)) => {
assert!(msg.contains("no resource"), "{label}: msg: {msg}");
}
other => panic!("{label}: expected Unsupported, got {other:?}"),
}
}
}
#[test]
fn all_five_faces_apply_get_uniform_across_yaml() {
let faces: Vec<Box<dyn Face>> = vec![
Box::new(PureRaftFace::from_declaration(&raft_decl()).unwrap()),
Box::new(KubernetesFace::from_declaration(&k8s_decl()).unwrap()),
Box::new(NomadFace::from_declaration(&nomad_decl()).unwrap()),
Box::new(SystemdFace::from_declaration(&systemd_decl(false)).unwrap()),
Box::new(BareMetalSupervisorFace::from_declaration(&bms_decl()).unwrap()),
];
let yaml = yaml_manifest("nginx", "default");
let r = ResourceRef::namespaced("Pod", "nginx", "default");
for face in &faces {
face.apply_resource(ResourceFormat::Yaml, &yaml).unwrap();
let back = face.get_resource(&r, ResourceFormat::Yaml).unwrap();
assert_eq!(back, yaml, "face {} should round-trip YAML", face.name());
}
}
fn raft_face() -> PureRaftFace {
PureRaftFace::from_declaration(&raft_decl()).unwrap()
}
fn pod_ref(name: &str, ns: &str) -> ResourceRef {
ResourceRef::namespaced("Pod", name, ns)
}
fn envelope(reference: &ResourceRef, payload: &[u8]) -> Vec<u8> {
encode_native_envelope(reference, payload).expect("envelope encode")
}
#[test]
fn pure_raft_apply_then_get_round_trips_envelope() {
let face = raft_face();
let r = pod_ref("nginx", "default");
let env = envelope(&r, b"my-payload");
face.apply_resource(ResourceFormat::Native, &env).unwrap();
let got = face
.get_resource(&r, ResourceFormat::Native)
.expect("get after apply");
assert_eq!(got, env);
}
#[test]
fn pure_raft_apply_yaml_now_works_via_adapter_registry() {
let face = raft_face();
let yaml = b"apiVersion: v1\nkind: Pod\nmetadata:\n name: nginx\n namespace: default\nspec:\n containers:\n - name: c\n image: nginx\n";
face.apply_resource(ResourceFormat::Yaml, yaml)
.expect("YAML apply should succeed via K8sYamlAdapter");
let r = ResourceRef::namespaced("Pod", "nginx", "default");
let back = face
.get_resource(&r, ResourceFormat::Yaml)
.expect("YAML get should succeed");
assert_eq!(back, yaml);
}
#[test]
fn pure_raft_apply_json_works_via_adapter_registry() {
let face = raft_face();
let json = serde_json::to_vec(&serde_json::json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": { "name": "nginx", "namespace": "default" },
"spec": { "containers": [{"name": "c", "image": "nginx"}] }
}))
.unwrap();
face.apply_resource(ResourceFormat::Json, &json)
.expect("JSON apply should succeed");
let r = ResourceRef::namespaced("Pod", "nginx", "default");
let back = face
.get_resource(&r, ResourceFormat::Json)
.expect("JSON get should succeed");
assert_eq!(back, json);
}
#[test]
fn pure_raft_with_custom_adapter_registry_overrides_default() {
let face = PureRaftFace::from_declaration(&raft_decl())
.unwrap()
.with_adapters({
let mut r = crate::format::AdapterRegistry::empty();
r.register(std::sync::Arc::new(
crate::format::NativePassthroughAdapter,
));
r
});
let r = pod_ref("nginx", "default");
match face.apply_resource(ResourceFormat::Yaml, b"apiVersion: v1\nkind: Pod\nmetadata: {name: x}\n") {
Err(FaceError::Unsupported(msg)) => {
assert!(msg.contains("Yaml") || msg.contains("UnsupportedFormat"), "msg: {msg}");
}
other => panic!("expected Unsupported, got {other:?}"),
}
face.apply_resource(ResourceFormat::Native, &envelope(&r, b"x"))
.expect("Native still works");
}
#[test]
fn pure_raft_invalid_yaml_apply_returns_clear_parse_error() {
let face = raft_face();
let yaml = b"apiVersion: v1\nmetadata:\n name: nginx\n";
match face.apply_resource(ResourceFormat::Yaml, yaml) {
Err(FaceError::Unsupported(msg)) => {
assert!(msg.contains("kind"), "msg should mention missing kind: {msg}");
}
other => panic!("expected Unsupported (MissingField kind), got {other:?}"),
}
}
#[test]
fn pure_raft_list_yaml_returns_each_envelope_as_yaml() {
let face = raft_face();
let yaml_a = b"apiVersion: v1\nkind: Pod\nmetadata:\n name: a\n namespace: default\n";
let yaml_b = b"apiVersion: v1\nkind: Pod\nmetadata:\n name: b\n namespace: default\n";
face.apply_resource(ResourceFormat::Yaml, yaml_a).unwrap();
face.apply_resource(ResourceFormat::Yaml, yaml_b).unwrap();
let listed = face
.list_resources("Pod", Some("default"), ResourceFormat::Yaml)
.unwrap();
assert_eq!(listed.len(), 2);
let mut got: Vec<&[u8]> = listed.iter().map(Vec::as_slice).collect();
got.sort();
let mut want = vec![yaml_a.as_slice(), yaml_b.as_slice()];
want.sort();
assert_eq!(got, want);
}
#[test]
fn pure_raft_get_missing_resource_errors() {
let face = raft_face();
let r = pod_ref("does-not-exist", "default");
match face.get_resource(&r, ResourceFormat::Native) {
Err(FaceError::Unsupported(msg)) => {
assert!(msg.contains("no resource"), "msg: {msg}");
}
other => panic!("expected Unsupported (no resource), got {other:?}"),
}
}
#[test]
fn pure_raft_apply_updates_existing_with_modified_event() {
let face = raft_face();
let r = pod_ref("nginx", "default");
let v1 = envelope(&r, b"v1");
let v2 = envelope(&r, b"v2");
face.apply_resource(ResourceFormat::Native, &v1).unwrap();
face.apply_resource(ResourceFormat::Native, &v2).unwrap();
let got = face.get_resource(&r, ResourceFormat::Native).unwrap();
assert_eq!(got, v2);
}
#[test]
fn pure_raft_list_returns_all_of_kind_in_namespace() {
let face = raft_face();
let env_a = envelope(&pod_ref("a", "default"), b"A");
let env_b = envelope(&pod_ref("b", "default"), b"B");
let env_c = envelope(&pod_ref("c", "other"), b"C");
face.apply_resource(ResourceFormat::Native, &env_a).unwrap();
face.apply_resource(ResourceFormat::Native, &env_b).unwrap();
face.apply_resource(ResourceFormat::Native, &env_c).unwrap();
let in_default = face
.list_resources("Pod", Some("default"), ResourceFormat::Native)
.unwrap();
assert_eq!(in_default.len(), 2);
let mut got: Vec<Vec<u8>> = in_default;
got.sort();
let mut want = vec![env_a, env_b];
want.sort();
assert_eq!(got, want);
}
#[test]
fn pure_raft_delete_removes_then_get_errors() {
let face = raft_face();
let r = pod_ref("nginx", "default");
face.apply_resource(ResourceFormat::Native, &envelope(&r, b"x"))
.unwrap();
face.delete_resource(&r).unwrap();
match face.get_resource(&r, ResourceFormat::Native) {
Err(FaceError::Unsupported(msg)) => {
assert!(msg.contains("no resource"), "msg: {msg}");
}
other => panic!("expected Unsupported after delete, got {other:?}"),
}
}
#[test]
fn pure_raft_delete_missing_resource_errors() {
let face = raft_face();
let r = pod_ref("missing", "default");
match face.delete_resource(&r) {
Err(FaceError::Unsupported(msg)) => {
assert!(msg.contains("no resource"), "msg: {msg}");
}
other => panic!("expected Unsupported, got {other:?}"),
}
}
#[test]
fn pure_raft_watch_replays_existing_state_as_added() {
let face = raft_face();
let env_a = envelope(&pod_ref("a", "default"), b"A");
let env_b = envelope(&pod_ref("b", "default"), b"B");
face.apply_resource(ResourceFormat::Native, &env_a).unwrap();
face.apply_resource(ResourceFormat::Native, &env_b).unwrap();
let mut watch = face
.watch_resources("Pod", Some("default"), ResourceFormat::Native)
.unwrap();
let mut got: Vec<Vec<u8>> = Vec::new();
for _ in 0..2 {
let ev = watch.next_event().unwrap().expect("event");
assert_eq!(ev.kind, FaceWatchEventKind::Added);
got.push(ev.body);
}
got.sort();
let mut want = vec![env_a, env_b];
want.sort();
assert_eq!(got, want);
}
#[test]
fn pure_raft_watch_streams_modified_then_deleted() {
use std::sync::Arc;
use std::thread;
let face = Arc::new(raft_face());
let r = pod_ref("nginx", "default");
let v1 = envelope(&r, b"v1");
let v2 = envelope(&r, b"v2");
face.apply_resource(ResourceFormat::Native, &v1).unwrap();
let mut watch = face
.watch_resources("Pod", Some("default"), ResourceFormat::Native)
.unwrap();
let replay = watch.next_event().unwrap().expect("replay");
assert_eq!(replay.kind, FaceWatchEventKind::Added);
assert_eq!(replay.body, v1);
let face2 = Arc::clone(&face);
let r2 = r.clone();
let v2_clone = v2.clone();
let writer = thread::spawn(move || {
face2
.apply_resource(ResourceFormat::Native, &v2_clone)
.unwrap();
face2.delete_resource(&r2).unwrap();
});
let mod_ev = watch.next_event().unwrap().expect("mod");
assert_eq!(mod_ev.kind, FaceWatchEventKind::Modified);
assert_eq!(mod_ev.body, v2);
let del_ev = watch.next_event().unwrap().expect("del");
assert_eq!(del_ev.kind, FaceWatchEventKind::Deleted);
assert_eq!(del_ev.body, v2);
writer.join().unwrap();
}
#[test]
fn pure_raft_watch_filters_by_kind() {
let face = raft_face();
let mut pod_watch = face
.watch_resources("Pod", None, ResourceFormat::Native)
.unwrap();
face.apply_resource(
ResourceFormat::Native,
&envelope(
&ResourceRef::namespaced("Service", "frontend", "default"),
b"S",
),
)
.unwrap();
let pod_env = envelope(&pod_ref("nginx", "default"), b"P");
face.apply_resource(ResourceFormat::Native, &pod_env).unwrap();
let ev = pod_watch.next_event().unwrap().expect("pod event");
assert_eq!(ev.body, pod_env);
}
#[test]
fn pure_raft_watch_filters_by_namespace() {
let face = raft_face();
let mut watch = face
.watch_resources("Pod", Some("default"), ResourceFormat::Native)
.unwrap();
face.apply_resource(
ResourceFormat::Native,
&envelope(&pod_ref("a", "other"), b"O"),
)
.unwrap();
let default_env = envelope(&pod_ref("b", "default"), b"D");
face.apply_resource(ResourceFormat::Native, &default_env).unwrap();
let ev = watch.next_event().unwrap().expect("event");
assert_eq!(ev.body, default_env);
}
#[test]
fn pure_raft_watch_multiple_subscribers_all_receive_events() {
let face = raft_face();
let mut w1 = face
.watch_resources("Pod", None, ResourceFormat::Native)
.unwrap();
let mut w2 = face
.watch_resources("Pod", None, ResourceFormat::Native)
.unwrap();
let pod_env = envelope(&pod_ref("nginx", "default"), b"x");
face.apply_resource(ResourceFormat::Native, &pod_env).unwrap();
let e1 = w1.next_event().unwrap().expect("w1 event");
let e2 = w2.next_event().unwrap().expect("w2 event");
assert_eq!(e1.body, pod_env);
assert_eq!(e2.body, pod_env);
}
#[test]
fn encode_native_envelope_round_trips_through_apply() {
let face = raft_face();
let r = pod_ref("nginx", "default");
let env = encode_native_envelope(&r, b"payload").unwrap();
face.apply_resource(ResourceFormat::Native, &env).unwrap();
let got = face.get_resource(&r, ResourceFormat::Native).unwrap();
assert_eq!(got, env);
}
#[test]
fn resource_ref_cluster_scoped_has_no_namespace() {
let r = ResourceRef::cluster_scoped("Namespace", "default");
assert_eq!(r.kind, "Namespace");
assert_eq!(r.name, "default");
assert_eq!(r.namespace, None);
}
#[test]
fn resource_ref_namespaced_carries_namespace() {
let r = ResourceRef::namespaced("Pod", "nginx", "default");
assert_eq!(r.kind, "Pod");
assert_eq!(r.name, "nginx");
assert_eq!(r.namespace, Some("default".into()));
}
#[test]
fn resource_ref_is_hashable() {
use std::collections::HashSet;
let mut s: HashSet<ResourceRef> = HashSet::new();
s.insert(ResourceRef::namespaced("Pod", "a", "default"));
s.insert(ResourceRef::namespaced("Pod", "a", "default"));
s.insert(ResourceRef::namespaced("Pod", "b", "default"));
assert_eq!(s.len(), 2);
}
#[test]
fn four_faces_compose_in_a_single_vec() {
let faces: Vec<Box<dyn Face>> = vec![
Box::new(PureRaftFace::from_declaration(&raft_decl()).unwrap()),
Box::new(KubernetesFace::from_declaration(&k8s_decl()).unwrap()),
Box::new(NomadFace::from_declaration(&nomad_decl()).unwrap()),
Box::new(SystemdFace::from_declaration(&systemd_decl(false)).unwrap()),
];
assert_eq!(faces.len(), 4);
let kinds: Vec<&'static str> = faces.iter().map(|f| match f.kind() {
FaceKind::PureRaft => "PureRaft",
FaceKind::Kubernetes { .. } => "Kubernetes",
FaceKind::Nomad { .. } => "Nomad",
FaceKind::Systemd { .. } => "Systemd",
FaceKind::BareMetalSupervisor => "BareMetalSupervisor",
}).collect();
assert_eq!(
kinds,
vec!["PureRaft", "Kubernetes", "Nomad", "Systemd"],
);
}
#[test]
fn three_faces_compose_in_a_single_vec() {
let faces: Vec<Box<dyn Face>> = vec![
Box::new(PureRaftFace::from_declaration(&raft_decl()).unwrap()),
Box::new(KubernetesFace::from_declaration(&k8s_decl()).unwrap()),
Box::new(NomadFace::from_declaration(&nomad_decl()).unwrap()),
];
assert_eq!(faces.len(), 3);
let names: Vec<&str> = faces.iter().map(|f| f.name()).collect();
assert_eq!(
names,
vec!["pure-raft-test", "k8s-v1.34", "nomad-1.7"],
);
for face in &faces {
assert!(!face.is_running());
}
}
#[test]
fn faces_can_be_carried_as_boxed_trait_objects() {
let faces: Vec<Box<dyn Face>> = vec![
Box::new(PureRaftFace::from_declaration(&raft_decl()).unwrap()),
Box::new(KubernetesFace::from_declaration(&k8s_decl()).unwrap()),
];
assert_eq!(faces.len(), 2);
for face in &faces {
assert!(!face.is_running());
}
}
#[test]
fn round_trip_declaration_through_face_and_back() {
let decl = k8s_decl();
let face = instantiate(&decl).unwrap();
assert_eq!(face.kind(), decl.kind);
}
}