use async_trait::async_trait;
use serde_json::Value;
use thiserror::Error;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize)]
pub struct ObjectRef {
pub api_version: String,
pub kind: String,
pub namespace: Option<String>,
pub name: String,
}
impl ObjectRef {
pub fn from_manifest(manifest: &Value) -> Result<Self, K8sClusterError> {
Ok(Self {
api_version: manifest_field(manifest, &["apiVersion"])?,
kind: manifest_field(manifest, &["kind"])?,
namespace: manifest
.get("metadata")
.and_then(|m| m.get("namespace"))
.and_then(Value::as_str)
.map(str::to_string),
name: manifest_field(manifest, &["metadata", "name"])?,
})
}
}
pub(super) fn manifest_field(manifest: &Value, path: &[&str]) -> Result<String, K8sClusterError> {
let mut cur = manifest;
for p in path {
cur = cur.get(p).ok_or_else(|| {
K8sClusterError::InvalidManifest(format!("manifest is missing `{}`", path.join(".")))
})?;
}
cur.as_str().map(str::to_string).ok_or_else(|| {
K8sClusterError::InvalidManifest(format!("`{}` is not a string", path.join(".")))
})
}
impl std::fmt::Display for ObjectRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.namespace {
Some(ns) => write!(f, "{}/{} {}/{}", self.api_version, self.kind, ns, self.name),
None => write!(f, "{}/{} {}", self.api_version, self.kind, self.name),
}
}
}
#[derive(Debug, Error)]
pub enum K8sClusterError {
#[error(
"no Kubernetes API client is bound to the K8s deployer env-pack; \
binding a connected cluster client rides the Phase D orchestration \
wiring (PR-5.3) — until then K8s provider verbs cannot run"
)]
Unconfigured,
#[error("invalid manifest: {0}")]
InvalidManifest(String),
#[error("Kubernetes API error: {0}")]
Api(String),
#[error(
"refusing to apply `{object}` in namespace `{namespace}`: \
it is owned by env `{existing_env}` but this apply belongs to env `{incoming_env}`"
)]
OwnershipConflict {
object: String,
namespace: String,
existing_env: String,
incoming_env: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RolloutStatus {
pub generation: i64,
pub observed_generation: Option<i64>,
pub replicas: i32,
pub updated_replicas: i32,
pub available_replicas: i32,
}
impl RolloutStatus {
pub fn is_complete(&self, desired: i32) -> bool {
self.observed_generation
.is_some_and(|observed| observed >= self.generation)
&& self.updated_replicas >= desired
&& self.replicas <= self.updated_replicas
&& self.available_replicas >= desired
}
}
#[async_trait]
pub trait K8sCluster: std::fmt::Debug + Send + Sync {
async fn apply(&self, manifest: &Value) -> Result<(), K8sClusterError>;
async fn delete(&self, object: &ObjectRef) -> Result<(), K8sClusterError>;
async fn get_rollout_status(
&self,
deployment: &ObjectRef,
) -> Result<RolloutStatus, K8sClusterError>;
}
#[derive(Debug, Default)]
pub struct UnconfiguredCluster;
#[async_trait]
impl K8sCluster for UnconfiguredCluster {
async fn apply(&self, _manifest: &Value) -> Result<(), K8sClusterError> {
Err(K8sClusterError::Unconfigured)
}
async fn delete(&self, _object: &ObjectRef) -> Result<(), K8sClusterError> {
Err(K8sClusterError::Unconfigured)
}
async fn get_rollout_status(
&self,
_deployment: &ObjectRef,
) -> Result<RolloutStatus, K8sClusterError> {
Err(K8sClusterError::Unconfigured)
}
}
#[cfg(test)]
#[derive(Debug, Default)]
pub struct InMemoryCluster {
objects: std::sync::Mutex<std::collections::BTreeMap<ObjectRef, Value>>,
}
#[cfg(test)]
impl InMemoryCluster {
pub fn objects(&self) -> std::collections::BTreeMap<ObjectRef, Value> {
self.objects.lock().expect("mutex not poisoned").clone()
}
}
#[cfg(test)]
#[async_trait]
impl K8sCluster for InMemoryCluster {
async fn apply(&self, manifest: &Value) -> Result<(), K8sClusterError> {
let object = ObjectRef::from_manifest(manifest)?;
self.objects
.lock()
.expect("mutex not poisoned")
.insert(object, manifest.clone());
Ok(())
}
async fn delete(&self, object: &ObjectRef) -> Result<(), K8sClusterError> {
self.objects
.lock()
.expect("mutex not poisoned")
.remove(object);
Ok(())
}
async fn get_rollout_status(
&self,
_deployment: &ObjectRef,
) -> Result<RolloutStatus, K8sClusterError> {
Ok(RolloutStatus {
generation: 0,
observed_generation: Some(0),
replicas: i32::MAX,
updated_replicas: i32::MAX,
available_replicas: i32::MAX,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn manifest() -> Value {
json!({
"apiVersion": "v1",
"kind": "Service",
"metadata": {"name": "svc-a", "namespace": "ns-a"},
})
}
#[test]
fn rollout_complete_when_observed_and_replicas_meet_desired() {
let s = RolloutStatus {
generation: 3,
observed_generation: Some(3),
replicas: 1,
updated_replicas: 1,
available_replicas: 1,
};
assert!(s.is_complete(1));
}
#[test]
fn rollout_incomplete_until_controller_observes_latest_generation() {
let s = RolloutStatus {
generation: 4,
observed_generation: Some(3),
replicas: 1,
updated_replicas: 1,
available_replicas: 1,
};
assert!(!s.is_complete(1));
}
#[test]
fn rollout_incomplete_when_no_status_written_yet() {
let s = RolloutStatus {
generation: 1,
observed_generation: None,
replicas: 0,
updated_replicas: 0,
available_replicas: 0,
};
assert!(!s.is_complete(1));
}
#[test]
fn rollout_incomplete_when_available_replicas_below_desired() {
let s = RolloutStatus {
generation: 2,
observed_generation: Some(2),
replicas: 1,
updated_replicas: 1,
available_replicas: 0,
};
assert!(!s.is_complete(1));
}
#[test]
fn rollout_incomplete_when_only_old_replicaset_is_available() {
let s = RolloutStatus {
generation: 2,
observed_generation: Some(2),
replicas: 1,
updated_replicas: 0,
available_replicas: 1,
};
assert!(!s.is_complete(1));
}
#[test]
fn rollout_incomplete_while_old_replicas_linger_during_surge() {
let s = RolloutStatus {
generation: 3,
observed_generation: Some(3),
replicas: 2,
updated_replicas: 1,
available_replicas: 2,
};
assert!(!s.is_complete(1));
}
#[test]
fn object_ref_extracts_identity_from_manifest() {
let r = ObjectRef::from_manifest(&manifest()).unwrap();
assert_eq!(
r,
ObjectRef {
api_version: "v1".into(),
kind: "Service".into(),
namespace: Some("ns-a".into()),
name: "svc-a".into(),
}
);
}
#[test]
fn object_ref_without_namespace_is_cluster_scoped() {
let m = json!({"apiVersion": "v1", "kind": "Namespace", "metadata": {"name": "gtc-zain"}});
let r = ObjectRef::from_manifest(&m).unwrap();
assert_eq!(r.namespace, None);
assert_eq!(r.kind, "Namespace");
}
#[test]
fn object_ref_rejects_manifest_without_name() {
let m = json!({"apiVersion": "v1", "kind": "Service", "metadata": {"namespace": "ns"}});
let err = ObjectRef::from_manifest(&m).unwrap_err();
assert!(
matches!(err, K8sClusterError::InvalidManifest(ref msg) if msg.contains("metadata.name")),
"got {err:?}"
);
}
#[tokio::test]
async fn unconfigured_cluster_fails_both_verbs() {
let c = UnconfiguredCluster;
assert!(matches!(
c.apply(&manifest()).await.unwrap_err(),
K8sClusterError::Unconfigured
));
let r = ObjectRef::from_manifest(&manifest()).unwrap();
assert!(matches!(
c.delete(&r).await.unwrap_err(),
K8sClusterError::Unconfigured
));
}
#[tokio::test]
async fn in_memory_cluster_upserts_and_deletes_idempotently() {
let c = InMemoryCluster::default();
c.apply(&manifest()).await.unwrap();
c.apply(&manifest()).await.unwrap();
assert_eq!(c.objects().len(), 1, "apply is an upsert");
let r = ObjectRef::from_manifest(&manifest()).unwrap();
c.delete(&r).await.unwrap();
c.delete(&r).await.unwrap();
assert!(c.objects().is_empty(), "delete of absent is Ok");
}
}