use crate::backend::StoreBackend;
use crate::face::{FaceError, FaceWatchStream, ResourceFormat, ResourceRef};
use crate::face_store::InMemoryStore;
use crate::format::AdapterRegistry;
#[derive(Clone, Debug)]
pub struct RaftConfig {
pub node_id: u64,
pub peers: Vec<String>,
pub log_dir: std::path::PathBuf,
}
pub struct RaftBackend {
config: RaftConfig,
store: InMemoryStore,
}
impl RaftBackend {
#[must_use]
pub fn new(config: RaftConfig) -> Self {
let face_name = format!("raft-node-{}", config.node_id);
let mut store = InMemoryStore::new(face_name);
store.set_adapters(AdapterRegistry::default());
Self { config, store }
}
#[must_use]
pub fn with_adapters(mut self, adapters: AdapterRegistry) -> Self {
self.store.set_adapters(adapters);
self
}
#[must_use]
pub fn config(&self) -> &RaftConfig {
&self.config
}
}
impl StoreBackend for RaftBackend {
fn name(&self) -> &str {
"openraft"
}
fn apply(&self, format: ResourceFormat, body: &[u8]) -> Result<(), FaceError> {
self.store.apply(format, body)
}
fn get(
&self,
reference: &ResourceRef,
format: ResourceFormat,
) -> Result<Vec<u8>, FaceError> {
self.store.get(reference, format)
}
fn list(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FaceError> {
self.store.list(kind, namespace, format)
}
fn delete(&self, reference: &ResourceRef) -> Result<(), FaceError> {
self.store.delete(reference)
}
fn watch(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FaceError> {
self.store.watch(kind, namespace, format)
}
fn resource_count(&self) -> usize {
self.store.len()
}
fn subscriber_count(&self) -> usize {
self.store.subscriber_count()
}
fn snapshot(&self) -> Result<Vec<u8>, FaceError> {
self.store.snapshot()
}
fn restore(&self, snapshot_bytes: &[u8]) -> Result<(), FaceError> {
self.store.restore(snapshot_bytes)
}
}
#[derive(Clone, Debug)]
pub struct KubeApiServerConfig {
pub endpoint: String,
pub kubeconfig: Option<std::path::PathBuf>,
pub bearer_token: Option<String>,
pub api_version: String,
}
pub struct KubeApiServerBackend {
config: KubeApiServerConfig,
store: InMemoryStore,
}
impl KubeApiServerBackend {
#[must_use]
pub fn new(config: KubeApiServerConfig) -> Self {
let face_name = format!("kube-{}", config.api_version);
let mut store = InMemoryStore::new(face_name);
store.set_adapters(AdapterRegistry::default());
Self { config, store }
}
#[must_use]
pub fn config(&self) -> &KubeApiServerConfig {
&self.config
}
}
impl StoreBackend for KubeApiServerBackend {
fn name(&self) -> &str {
"kube-apiserver"
}
fn apply(&self, format: ResourceFormat, body: &[u8]) -> Result<(), FaceError> {
self.store.apply(format, body)
}
fn get(
&self,
reference: &ResourceRef,
format: ResourceFormat,
) -> Result<Vec<u8>, FaceError> {
self.store.get(reference, format)
}
fn list(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FaceError> {
self.store.list(kind, namespace, format)
}
fn delete(&self, reference: &ResourceRef) -> Result<(), FaceError> {
self.store.delete(reference)
}
fn watch(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FaceError> {
self.store.watch(kind, namespace, format)
}
fn resource_count(&self) -> usize {
self.store.len()
}
fn subscriber_count(&self) -> usize {
self.store.subscriber_count()
}
fn snapshot(&self) -> Result<Vec<u8>, FaceError> {
self.store.snapshot()
}
fn restore(&self, snapshot_bytes: &[u8]) -> Result<(), FaceError> {
self.store.restore(snapshot_bytes)
}
}
#[derive(Clone, Debug)]
pub struct NomadHttpConfig {
pub address: String,
pub token: Option<String>,
pub region: String,
}
pub struct NomadHttpBackend {
config: NomadHttpConfig,
store: InMemoryStore,
}
impl NomadHttpBackend {
#[must_use]
pub fn new(config: NomadHttpConfig) -> Self {
let face_name = format!("nomad-{}", config.region);
let mut store = InMemoryStore::new(face_name);
store.set_adapters(AdapterRegistry::default());
Self { config, store }
}
#[must_use]
pub fn config(&self) -> &NomadHttpConfig {
&self.config
}
}
impl StoreBackend for NomadHttpBackend {
fn name(&self) -> &str {
"nomad-http"
}
fn apply(&self, format: ResourceFormat, body: &[u8]) -> Result<(), FaceError> {
self.store.apply(format, body)
}
fn get(
&self,
reference: &ResourceRef,
format: ResourceFormat,
) -> Result<Vec<u8>, FaceError> {
self.store.get(reference, format)
}
fn list(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FaceError> {
self.store.list(kind, namespace, format)
}
fn delete(&self, reference: &ResourceRef) -> Result<(), FaceError> {
self.store.delete(reference)
}
fn watch(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FaceError> {
self.store.watch(kind, namespace, format)
}
fn resource_count(&self) -> usize {
self.store.len()
}
fn subscriber_count(&self) -> usize {
self.store.subscriber_count()
}
fn snapshot(&self) -> Result<Vec<u8>, FaceError> {
self.store.snapshot()
}
fn restore(&self, snapshot_bytes: &[u8]) -> Result<(), FaceError> {
self.store.restore(snapshot_bytes)
}
}
#[derive(Clone, Debug)]
pub struct SystemdDbusConfig {
pub user_units: bool,
pub auto_reload: bool,
}
pub struct SystemdDbusBackend {
config: SystemdDbusConfig,
store: InMemoryStore,
}
impl SystemdDbusBackend {
#[must_use]
pub fn new(config: SystemdDbusConfig) -> Self {
let face_name = if config.user_units {
"systemd-user".to_string()
} else {
"systemd-system".to_string()
};
let mut store = InMemoryStore::new(face_name);
store.set_adapters(AdapterRegistry::default());
Self { config, store }
}
#[must_use]
pub fn config(&self) -> &SystemdDbusConfig {
&self.config
}
}
impl StoreBackend for SystemdDbusBackend {
fn name(&self) -> &str {
"systemd-dbus"
}
fn apply(&self, format: ResourceFormat, body: &[u8]) -> Result<(), FaceError> {
self.store.apply(format, body)
}
fn get(
&self,
reference: &ResourceRef,
format: ResourceFormat,
) -> Result<Vec<u8>, FaceError> {
self.store.get(reference, format)
}
fn list(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FaceError> {
self.store.list(kind, namespace, format)
}
fn delete(&self, reference: &ResourceRef) -> Result<(), FaceError> {
self.store.delete(reference)
}
fn watch(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FaceError> {
self.store.watch(kind, namespace, format)
}
fn resource_count(&self) -> usize {
self.store.len()
}
fn subscriber_count(&self) -> usize {
self.store.subscriber_count()
}
fn snapshot(&self) -> Result<Vec<u8>, FaceError> {
self.store.snapshot()
}
fn restore(&self, snapshot_bytes: &[u8]) -> Result<(), FaceError> {
self.store.restore(snapshot_bytes)
}
}
#[derive(Clone, Debug)]
pub struct SupervisedSystemdConfig {
pub hostname: String,
pub runtime: String,
}
pub struct SupervisedSystemdBackend {
config: SupervisedSystemdConfig,
store: InMemoryStore,
}
impl SupervisedSystemdBackend {
#[must_use]
pub fn new(config: SupervisedSystemdConfig) -> Self {
let face_name = format!("bms-{}", config.hostname);
let mut store = InMemoryStore::new(face_name);
store.set_adapters(AdapterRegistry::default());
Self { config, store }
}
#[must_use]
pub fn config(&self) -> &SupervisedSystemdConfig {
&self.config
}
}
impl StoreBackend for SupervisedSystemdBackend {
fn name(&self) -> &str {
"supervised-systemd"
}
fn apply(&self, format: ResourceFormat, body: &[u8]) -> Result<(), FaceError> {
self.store.apply(format, body)
}
fn get(
&self,
reference: &ResourceRef,
format: ResourceFormat,
) -> Result<Vec<u8>, FaceError> {
self.store.get(reference, format)
}
fn list(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Vec<Vec<u8>>, FaceError> {
self.store.list(kind, namespace, format)
}
fn delete(&self, reference: &ResourceRef) -> Result<(), FaceError> {
self.store.delete(reference)
}
fn watch(
&self,
kind: &str,
namespace: Option<&str>,
format: ResourceFormat,
) -> Result<Box<dyn FaceWatchStream>, FaceError> {
self.store.watch(kind, namespace, format)
}
fn resource_count(&self) -> usize {
self.store.len()
}
fn subscriber_count(&self) -> usize {
self.store.subscriber_count()
}
fn snapshot(&self) -> Result<Vec<u8>, FaceError> {
self.store.snapshot()
}
fn restore(&self, snapshot_bytes: &[u8]) -> Result<(), FaceError> {
self.store.restore(snapshot_bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::face::encode_native_envelope;
fn pod_ref() -> ResourceRef {
ResourceRef::namespaced("Pod", "nginx", "default")
}
fn envelope() -> Vec<u8> {
encode_native_envelope(&pod_ref(), b"payload").unwrap()
}
#[test]
fn raft_backend_constructs_from_config() {
let backend = RaftBackend::new(RaftConfig {
node_id: 7,
peers: vec!["10.0.0.1:7000".into(), "10.0.0.2:7000".into()],
log_dir: "/var/lib/engenho/raft".into(),
});
assert_eq!(backend.name(), "openraft");
assert_eq!(backend.config().node_id, 7);
assert_eq!(backend.config().peers.len(), 2);
}
#[test]
fn raft_backend_dispatches_5_verbs() {
let backend = RaftBackend::new(RaftConfig {
node_id: 1,
peers: vec![],
log_dir: "/tmp".into(),
});
backend.apply(ResourceFormat::Native, &envelope()).unwrap();
let r = pod_ref();
assert_eq!(backend.get(&r, ResourceFormat::Native).unwrap(), envelope());
assert_eq!(backend.list("Pod", Some("default"), ResourceFormat::Native).unwrap().len(), 1);
let _ = backend.watch("Pod", None, ResourceFormat::Native).unwrap();
backend.delete(&r).unwrap();
}
#[test]
fn kube_apiserver_backend_constructs_from_config() {
let backend = KubeApiServerBackend::new(KubeApiServerConfig {
endpoint: "https://k.svc:6443".into(),
kubeconfig: Some("/etc/kubernetes/admin.conf".into()),
bearer_token: None,
api_version: "1.34".into(),
});
assert_eq!(backend.name(), "kube-apiserver");
assert_eq!(backend.config().api_version, "1.34");
}
#[test]
fn kube_apiserver_backend_dispatches_verbs() {
let backend = KubeApiServerBackend::new(KubeApiServerConfig {
endpoint: "https://k:6443".into(),
kubeconfig: None,
bearer_token: Some("tok".into()),
api_version: "1.34".into(),
});
backend.apply(ResourceFormat::Native, &envelope()).unwrap();
assert_eq!(backend.resource_count(), 1);
}
#[test]
fn nomad_http_backend_constructs_from_config() {
let backend = NomadHttpBackend::new(NomadHttpConfig {
address: "http://127.0.0.1:4646".into(),
token: None,
region: "global".into(),
});
assert_eq!(backend.name(), "nomad-http");
assert_eq!(backend.config().region, "global");
}
#[test]
fn nomad_http_backend_dispatches_verbs() {
let backend = NomadHttpBackend::new(NomadHttpConfig {
address: "http://n:4646".into(),
token: None,
region: "us-east".into(),
});
backend.apply(ResourceFormat::Native, &envelope()).unwrap();
assert_eq!(backend.resource_count(), 1);
}
#[test]
fn systemd_dbus_backend_constructs_from_config() {
let backend = SystemdDbusBackend::new(SystemdDbusConfig {
user_units: false,
auto_reload: true,
});
assert_eq!(backend.name(), "systemd-dbus");
assert!(!backend.config().user_units);
}
#[test]
fn systemd_dbus_backend_dispatches_verbs() {
let backend = SystemdDbusBackend::new(SystemdDbusConfig {
user_units: true,
auto_reload: false,
});
backend.apply(ResourceFormat::Native, &envelope()).unwrap();
assert_eq!(backend.resource_count(), 1);
}
#[test]
fn supervised_systemd_backend_constructs_from_config() {
let backend = SupervisedSystemdBackend::new(SupervisedSystemdConfig {
hostname: "host01".into(),
runtime: "podman".into(),
});
assert_eq!(backend.name(), "supervised-systemd");
assert_eq!(backend.config().runtime, "podman");
}
#[test]
fn supervised_systemd_backend_dispatches_verbs() {
let backend = SupervisedSystemdBackend::new(SupervisedSystemdConfig {
hostname: "h".into(),
runtime: "docker".into(),
});
backend.apply(ResourceFormat::Native, &envelope()).unwrap();
assert_eq!(backend.resource_count(), 1);
}
#[test]
fn snapshot_round_trips_across_every_backend_pair() {
let backends: Vec<Box<dyn StoreBackend>> = vec![
Box::new(InMemoryStore::new("mem")),
Box::new(RaftBackend::new(RaftConfig {
node_id: 1,
peers: vec![],
log_dir: "/tmp".into(),
})),
Box::new(KubeApiServerBackend::new(KubeApiServerConfig {
endpoint: "https://k:6443".into(),
kubeconfig: None,
bearer_token: None,
api_version: "1.34".into(),
})),
Box::new(NomadHttpBackend::new(NomadHttpConfig {
address: "http://n:4646".into(),
token: None,
region: "g".into(),
})),
Box::new(SystemdDbusBackend::new(SystemdDbusConfig {
user_units: false,
auto_reload: false,
})),
Box::new(SupervisedSystemdBackend::new(SupervisedSystemdConfig {
hostname: "h".into(),
runtime: "podman".into(),
})),
];
for b in &backends {
b.apply(ResourceFormat::Native, &envelope()).unwrap();
}
let snaps: Vec<Vec<u8>> = backends.iter().map(|b| b.snapshot().unwrap()).collect();
for i in 1..snaps.len() {
assert_eq!(
snaps[i], snaps[0],
"backend {} snapshot diverged from backend 0",
backends[i].name(),
);
}
let snap = snaps.into_iter().next().unwrap();
for b in &backends[1..] {
let r = pod_ref();
let _ = b.delete(&r);
assert_eq!(b.resource_count(), 0);
b.restore(&snap).unwrap();
assert_eq!(b.resource_count(), 1, "backend {} restore failed", b.name());
}
}
#[test]
fn every_backend_implements_send_sync_static() {
fn assert_send_sync_static<T: Send + Sync + 'static>() {}
assert_send_sync_static::<RaftBackend>();
assert_send_sync_static::<KubeApiServerBackend>();
assert_send_sync_static::<NomadHttpBackend>();
assert_send_sync_static::<SystemdDbusBackend>();
assert_send_sync_static::<SupervisedSystemdBackend>();
}
#[test]
fn every_backend_carries_named_identity() {
let backends: Vec<(&'static str, Box<dyn StoreBackend>)> = vec![
(
"openraft",
Box::new(RaftBackend::new(RaftConfig {
node_id: 1,
peers: vec![],
log_dir: "/tmp".into(),
})),
),
(
"kube-apiserver",
Box::new(KubeApiServerBackend::new(KubeApiServerConfig {
endpoint: "x".into(),
kubeconfig: None,
bearer_token: None,
api_version: "1".into(),
})),
),
(
"nomad-http",
Box::new(NomadHttpBackend::new(NomadHttpConfig {
address: "x".into(),
token: None,
region: "g".into(),
})),
),
(
"systemd-dbus",
Box::new(SystemdDbusBackend::new(SystemdDbusConfig {
user_units: false,
auto_reload: false,
})),
),
(
"supervised-systemd",
Box::new(SupervisedSystemdBackend::new(SupervisedSystemdConfig {
hostname: "h".into(),
runtime: "p".into(),
})),
),
];
for (expected_name, backend) in backends {
assert_eq!(backend.name(), expected_name);
}
}
}