use async_trait::async_trait;
use k8s_openapi::api::apps::v1::Deployment;
use k8s_openapi::api::authentication::v1::SelfSubjectReview;
use k8s_openapi::api::authorization::v1::{
ResourceAttributes, SelfSubjectAccessReview, SelfSubjectAccessReviewSpec,
};
use kube::api::{Api, ApiResource, DeleteParams, DynamicObject, Patch, PatchParams, PostParams};
use kube::config::KubeConfigOptions;
use serde_json::Value;
use super::cluster::{K8sCluster, K8sClusterError, ObjectRef, RolloutStatus, manifest_field};
use super::credentials::{
AccessDecision, ClusterIdentity, K8sClientError, K8sOperation, K8sValidatorClient,
OperationDecision,
};
use super::manifests::ENV_LABEL;
pub const FIELD_MANAGER: &str = "greentic-deployer";
pub async fn connect(
kubeconfig_context: Option<&str>,
bound_token: Option<&str>,
) -> Result<kube::Client, K8sClientError> {
install_default_crypto_provider();
let mut config = match kubeconfig_context {
Some(context) => kube::Config::from_kubeconfig(&KubeConfigOptions {
context: Some(context.to_string()),
..Default::default()
})
.await
.map_err(|e| {
K8sClientError::NoClusterAccess(format!("kubeconfig context `{context}`: {e}"))
})?,
None => kube::Config::infer()
.await
.map_err(|e| K8sClientError::NoClusterAccess(e.to_string()))?,
};
apply_bound_token(&mut config, bound_token);
kube::Client::try_from(config).map_err(|e| K8sClientError::NoClusterAccess(e.to_string()))
}
fn install_default_crypto_provider() {
if rustls::crypto::CryptoProvider::get_default().is_none() {
let _ = rustls::crypto::ring::default_provider().install_default();
}
}
fn apply_bound_token(config: &mut kube::Config, token: Option<&str>) {
if let Some(tok) = token {
config.auth_info.token = Some(tok.into());
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Scope {
Namespaced,
Cluster,
}
fn api_route_for(api_version: &str, kind: &str) -> Result<(ApiResource, Scope), K8sClusterError> {
let (plural, scope) = match (api_version, kind) {
("v1", "Namespace") => ("namespaces", Scope::Cluster),
("v1", "Service") => ("services", Scope::Namespaced),
("v1", "ConfigMap") => ("configmaps", Scope::Namespaced),
("apps/v1", "Deployment") => ("deployments", Scope::Namespaced),
("policy/v1", "PodDisruptionBudget") => ("poddisruptionbudgets", Scope::Namespaced),
("networking.k8s.io/v1", "NetworkPolicy") => ("networkpolicies", Scope::Namespaced),
_ => {
return Err(K8sClusterError::InvalidManifest(format!(
"unsupported object `{api_version}/{kind}` — the deployer's routing table \
covers exactly the kinds the manifest renderer emits; extend \
`api_route_for` alongside the renderer"
)));
}
};
let (group, version) = match api_version.split_once('/') {
Some((group, version)) => (group, version),
None => ("", api_version),
};
Ok((
ApiResource {
group: group.to_string(),
version: version.to_string(),
api_version: api_version.to_string(),
kind: kind.to_string(),
plural: plural.to_string(),
},
scope,
))
}
fn dynamic_api(
client: &kube::Client,
resource: &ApiResource,
scope: Scope,
namespace: &str,
) -> Api<DynamicObject> {
match scope {
Scope::Cluster => Api::all_with(client.clone(), resource),
Scope::Namespaced => Api::namespaced_with(client.clone(), namespace, resource),
}
}
fn map_cluster_error(e: kube::Error) -> K8sClusterError {
match e {
kube::Error::Api(status) => {
K8sClusterError::Api(format!("{} (status {})", status.message, status.code))
}
other => K8sClusterError::Api(other.to_string()),
}
}
pub struct KubeCluster {
client: kube::Client,
}
impl KubeCluster {
pub fn new(client: kube::Client) -> Self {
Self { client }
}
fn api_for(&self, manifest: &Value) -> Result<(Api<DynamicObject>, String), K8sClusterError> {
let api_version = manifest_field(manifest, &["apiVersion"])?;
let kind = manifest_field(manifest, &["kind"])?;
let name = manifest_field(manifest, &["metadata", "name"])?;
let (resource, scope) = api_route_for(&api_version, &kind)?;
let namespace = match scope {
Scope::Cluster => String::new(),
Scope::Namespaced => manifest_field(manifest, &["metadata", "namespace"])?,
};
Ok((
dynamic_api(&self.client, &resource, scope, &namespace),
name,
))
}
}
impl std::fmt::Debug for KubeCluster {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KubeCluster").finish_non_exhaustive()
}
}
fn manifest_env_label(manifest: &Value) -> Option<&str> {
manifest
.get("metadata")
.and_then(|m| m.get("labels"))
.and_then(|l| l.get(ENV_LABEL))
.and_then(Value::as_str)
}
#[async_trait]
impl K8sCluster for KubeCluster {
async fn apply(&self, manifest: &Value) -> Result<(), K8sClusterError> {
let (api, name) = self.api_for(manifest)?;
let incoming_env = manifest_env_label(manifest);
if let Some(existing) = api.get_opt(&name).await.map_err(map_cluster_error)? {
let existing_env = existing
.metadata
.labels
.as_ref()
.and_then(|l| l.get(ENV_LABEL))
.map(String::as_str);
if let (Some(inc), Some(ext)) = (incoming_env, existing_env)
&& inc != ext
{
let namespace = manifest
.pointer("/metadata/namespace")
.and_then(Value::as_str)
.unwrap_or("<cluster-scoped>");
return Err(K8sClusterError::OwnershipConflict {
object: name,
namespace: namespace.to_string(),
existing_env: ext.to_string(),
incoming_env: inc.to_string(),
});
}
}
let params = PatchParams::apply(FIELD_MANAGER).force();
api.patch(&name, ¶ms, &Patch::Apply(manifest))
.await
.map_err(map_cluster_error)?;
Ok(())
}
async fn delete(&self, object: &ObjectRef) -> Result<(), K8sClusterError> {
let (resource, scope) = api_route_for(&object.api_version, &object.kind)?;
let namespace = object.namespace.as_deref().unwrap_or_default();
let api = dynamic_api(&self.client, &resource, scope, namespace);
match api.delete(&object.name, &DeleteParams::default()).await {
Ok(_) => Ok(()),
Err(kube::Error::Api(status)) if status.code == 404 => Ok(()),
Err(e) => Err(map_cluster_error(e)),
}
}
async fn get_rollout_status(
&self,
deployment: &ObjectRef,
) -> Result<RolloutStatus, K8sClusterError> {
let namespace = deployment.namespace.as_deref().unwrap_or_default();
let api: Api<Deployment> = Api::namespaced(self.client.clone(), namespace);
let dep = api.get(&deployment.name).await.map_err(map_cluster_error)?;
let status = dep.status.as_ref();
Ok(RolloutStatus {
generation: dep.metadata.generation.unwrap_or(0),
observed_generation: status.and_then(|s| s.observed_generation),
replicas: status.and_then(|s| s.replicas).unwrap_or(0),
updated_replicas: status.and_then(|s| s.updated_replicas).unwrap_or(0),
available_replicas: status.and_then(|s| s.available_replicas).unwrap_or(0),
})
}
}
fn map_validator_error(e: kube::Error) -> K8sClientError {
match e {
kube::Error::Api(status) => {
K8sClientError::ApiRejected(format!("{} (status {})", status.message, status.code))
}
kube::Error::Auth(e) => K8sClientError::NoClusterAccess(e.to_string()),
other => K8sClientError::Transport(other.to_string()),
}
}
pub struct KubeValidatorClient {
client: kube::Client,
}
impl KubeValidatorClient {
pub fn new(client: kube::Client) -> Self {
Self { client }
}
}
impl std::fmt::Debug for KubeValidatorClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KubeValidatorClient")
.finish_non_exhaustive()
}
}
#[async_trait]
impl K8sValidatorClient for KubeValidatorClient {
async fn who_am_i(&self) -> Result<ClusterIdentity, K8sClientError> {
let api: Api<SelfSubjectReview> = Api::all(self.client.clone());
let created = api
.create(&PostParams::default(), &SelfSubjectReview::default())
.await
.map_err(map_validator_error)?;
let user = created
.status
.and_then(|s| s.user_info)
.and_then(|u| u.username)
.ok_or_else(|| {
K8sClientError::ApiRejected(
"SelfSubjectReview response carried no user identity".to_string(),
)
})?;
Ok(ClusterIdentity { user })
}
async fn review_access<'a>(
&'a self,
namespace: &'a str,
operations: &'a [K8sOperation],
) -> Result<Vec<OperationDecision>, K8sClientError> {
let api: Api<SelfSubjectAccessReview> = Api::all(self.client.clone());
let mut decisions = Vec::with_capacity(operations.len());
for operation in operations {
let review = SelfSubjectAccessReview {
spec: SelfSubjectAccessReviewSpec {
resource_attributes: Some(ResourceAttributes {
namespace: Some(namespace.to_string()),
group: Some(operation.group.to_string()),
resource: Some(operation.resource.to_string()),
verb: Some(operation.verb.to_string()),
..Default::default()
}),
..Default::default()
},
..Default::default()
};
let created = api
.create(&PostParams::default(), &review)
.await
.map_err(map_validator_error)?;
let status = created.status.ok_or_else(|| {
K8sClientError::ApiRejected(
"SelfSubjectAccessReview response carried no status".to_string(),
)
})?;
let decision = if status.allowed {
AccessDecision::Allowed
} else {
AccessDecision::Denied(
status
.reason
.unwrap_or_else(|| "no reason supplied".to_string()),
)
};
decisions.push(OperationDecision {
operation: *operation,
decision,
});
}
Ok(decisions)
}
}
#[cfg(test)]
mod tests {
use super::*;
use http::{Request, Response};
use http_body_util::BodyExt;
use kube::client::Body;
use serde_json::json;
use tower_test::mock::{self, Handle};
type MockHandle = Handle<Request<Body>, Response<Body>>;
fn mock_client() -> (kube::Client, MockHandle) {
let (service, handle) = mock::pair::<Request<Body>, Response<Body>>();
(kube::Client::new(service, "default"), handle)
}
async fn respond_json(handle: &mut MockHandle, status: u16, body: Value) -> Request<Body> {
let (request, send) = handle.next_request().await.expect("a request is sent");
send.send_response(
Response::builder()
.status(status)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&body).expect("serializable")))
.expect("valid response"),
);
request
}
async fn request_body_json(request: Request<Body>) -> Value {
let bytes = request
.into_body()
.collect()
.await
.expect("request body readable")
.to_bytes();
serde_json::from_slice(&bytes).expect("request body is JSON")
}
fn deployment_manifest() -> Value {
json!({
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": "gtc-worker-a",
"namespace": "gtc-zain",
"labels": {"greentic.ai/env": "gtc-zain"},
},
"spec": {"replicas": 1},
})
}
fn not_found_status() -> Value {
json!({
"kind": "Status",
"apiVersion": "v1",
"status": "Failure",
"code": 404,
"reason": "NotFound",
"message": "not found",
})
}
async fn apply_ok(
cluster: &KubeCluster,
handle: &mut MockHandle,
manifest: &Value,
) -> Request<Body> {
let respond = async {
let _get = respond_json(handle, 404, not_found_status()).await;
respond_json(handle, 200, manifest.clone()).await
};
let (result, patch) = tokio::join!(cluster.apply(manifest), respond);
result.unwrap();
patch
}
#[tokio::test]
async fn apply_is_forced_server_side_apply_with_field_manager() {
let (client, mut handle) = mock_client();
let cluster = KubeCluster::new(client);
let manifest = deployment_manifest();
let request = apply_ok(&cluster, &mut handle, &manifest).await;
assert_eq!(request.method(), http::Method::PATCH);
assert_eq!(
request.uri().path(),
"/apis/apps/v1/namespaces/gtc-zain/deployments/gtc-worker-a"
);
let query = request.uri().query().expect("apply carries query params");
assert!(
query.contains("fieldManager=greentic-deployer"),
"field manager must identify the deployer: {query}"
);
assert!(query.contains("force=true"), "apply must force: {query}");
assert_eq!(
request
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok()),
Some("application/apply-patch+yaml"),
"server-side apply content type"
);
assert_eq!(
request_body_json(request).await,
manifest,
"the rendered manifest IS the patch body"
);
}
#[tokio::test]
async fn apply_routes_cluster_scoped_namespace_via_core_api() {
let (client, mut handle) = mock_client();
let cluster = KubeCluster::new(client);
let manifest = json!({
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {"name": "gtc-zain"},
});
let request = apply_ok(&cluster, &mut handle, &manifest).await;
assert_eq!(request.uri().path(), "/api/v1/namespaces/gtc-zain");
}
#[tokio::test]
async fn apply_uses_the_irregular_plurals() {
let (client, mut handle) = mock_client();
let cluster = KubeCluster::new(client);
let netpol = json!({
"apiVersion": "networking.k8s.io/v1",
"kind": "NetworkPolicy",
"metadata": {"name": "deny-all", "namespace": "gtc-zain"},
});
let request = apply_ok(&cluster, &mut handle, &netpol).await;
assert_eq!(
request.uri().path(),
"/apis/networking.k8s.io/v1/namespaces/gtc-zain/networkpolicies/deny-all"
);
let pdb = json!({
"apiVersion": "policy/v1",
"kind": "PodDisruptionBudget",
"metadata": {"name": "router", "namespace": "gtc-zain"},
});
let request = apply_ok(&cluster, &mut handle, &pdb).await;
assert_eq!(
request.uri().path(),
"/apis/policy/v1/namespaces/gtc-zain/poddisruptionbudgets/router"
);
}
#[tokio::test]
async fn apply_rejects_a_kind_outside_the_routing_table() {
let (client, _handle) = mock_client();
let cluster = KubeCluster::new(client);
let manifest = json!({
"apiVersion": "networking.k8s.io/v1",
"kind": "Ingress",
"metadata": {"name": "x", "namespace": "ns"},
});
let err = cluster.apply(&manifest).await.unwrap_err();
assert!(
matches!(err, K8sClusterError::InvalidManifest(ref msg)
if msg.contains("unsupported object `networking.k8s.io/v1/Ingress`")),
"no request may be guessed for an unrendered kind, got {err:?}"
);
}
#[tokio::test]
async fn apply_requires_namespace_on_namespaced_kinds() {
let (client, _handle) = mock_client();
let cluster = KubeCluster::new(client);
let manifest = json!({
"apiVersion": "v1",
"kind": "Service",
"metadata": {"name": "svc"},
});
let err = cluster.apply(&manifest).await.unwrap_err();
assert!(
matches!(err, K8sClusterError::InvalidManifest(ref msg)
if msg.contains("metadata.namespace")),
"got {err:?}"
);
}
fn worker_object_ref() -> ObjectRef {
ObjectRef {
api_version: "apps/v1".into(),
kind: "Deployment".into(),
namespace: Some("gtc-zain".into()),
name: "gtc-worker-a".into(),
}
}
#[tokio::test]
async fn delete_sends_delete_to_the_object_url() {
let (client, mut handle) = mock_client();
let cluster = KubeCluster::new(client);
let object = worker_object_ref();
let (result, request) = tokio::join!(
cluster.delete(&object),
respond_json(
&mut handle,
200,
json!({"kind": "Status", "apiVersion": "v1", "status": "Success"}),
),
);
result.unwrap();
assert_eq!(request.method(), http::Method::DELETE);
assert_eq!(
request.uri().path(),
"/apis/apps/v1/namespaces/gtc-zain/deployments/gtc-worker-a"
);
}
#[tokio::test]
async fn delete_of_an_absent_object_is_ok() {
let (client, mut handle) = mock_client();
let cluster = KubeCluster::new(client);
let object = worker_object_ref();
let (result, _request) = tokio::join!(
cluster.delete(&object),
respond_json(
&mut handle,
404,
json!({
"kind": "Status",
"apiVersion": "v1",
"status": "Failure",
"message": "deployments.apps \"gtc-worker-a\" not found",
"reason": "NotFound",
"code": 404,
}),
),
);
result.unwrap();
}
#[tokio::test]
async fn delete_surfaces_non_404_api_rejections() {
let (client, mut handle) = mock_client();
let cluster = KubeCluster::new(client);
let object = worker_object_ref();
let (result, _request) = tokio::join!(
cluster.delete(&object),
respond_json(
&mut handle,
403,
json!({
"kind": "Status",
"apiVersion": "v1",
"status": "Failure",
"message": "forbidden",
"reason": "Forbidden",
"code": 403,
}),
),
);
let err = result.unwrap_err();
assert!(
matches!(err, K8sClusterError::Api(ref msg) if msg.contains("forbidden")),
"got {err:?}"
);
}
#[tokio::test]
async fn get_rollout_status_reads_generation_and_available_replicas() {
let (client, mut handle) = mock_client();
let cluster = KubeCluster::new(client);
let object = worker_object_ref();
let (result, request) = tokio::join!(
cluster.get_rollout_status(&object),
respond_json(
&mut handle,
200,
json!({
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {"name": "gtc-worker-a", "namespace": "gtc-zain", "generation": 3},
"spec": {"replicas": 1},
"status": {
"observedGeneration": 3,
"replicas": 1,
"updatedReplicas": 1,
"availableReplicas": 1,
},
}),
),
);
let status = result.unwrap();
assert_eq!(status.generation, 3);
assert_eq!(status.observed_generation, Some(3));
assert_eq!(status.replicas, 1);
assert_eq!(status.updated_replicas, 1);
assert_eq!(status.available_replicas, 1);
assert!(
status.is_complete(1),
"observed caught up + the updated replica available, none lingering"
);
assert_eq!(request.method(), http::Method::GET);
assert_eq!(
request.uri().path(),
"/apis/apps/v1/namespaces/gtc-zain/deployments/gtc-worker-a"
);
}
#[tokio::test]
async fn get_rollout_status_treats_missing_status_as_not_yet_available() {
let (client, mut handle) = mock_client();
let cluster = KubeCluster::new(client);
let object = worker_object_ref();
let (result, _request) = tokio::join!(
cluster.get_rollout_status(&object),
respond_json(
&mut handle,
200,
json!({
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {"name": "gtc-worker-a", "namespace": "gtc-zain", "generation": 1},
"spec": {"replicas": 1},
}),
),
);
let status = result.unwrap();
assert_eq!(status.observed_generation, None);
assert_eq!(status.replicas, 0);
assert_eq!(status.updated_replicas, 0);
assert_eq!(status.available_replicas, 0);
assert!(
!status.is_complete(1),
"a Deployment with no status yet is not ready"
);
}
#[tokio::test]
async fn who_am_i_resolves_the_cluster_identity() {
let (client, mut handle) = mock_client();
let validator = KubeValidatorClient::new(client);
let (result, request) = tokio::join!(
validator.who_am_i(),
respond_json(
&mut handle,
201,
json!({
"apiVersion": "authentication.k8s.io/v1",
"kind": "SelfSubjectReview",
"metadata": {},
"status": {"userInfo": {
"username": "system:serviceaccount:gtc-zain:greentic-deployer",
}},
}),
),
);
assert_eq!(
result.unwrap(),
ClusterIdentity {
user: "system:serviceaccount:gtc-zain:greentic-deployer".into()
}
);
assert_eq!(request.method(), http::Method::POST);
assert_eq!(
request.uri().path(),
"/apis/authentication.k8s.io/v1/selfsubjectreviews"
);
}
#[tokio::test]
async fn who_am_i_without_identity_fails() {
let (client, mut handle) = mock_client();
let validator = KubeValidatorClient::new(client);
let (result, _request) = tokio::join!(
validator.who_am_i(),
respond_json(
&mut handle,
201,
json!({
"apiVersion": "authentication.k8s.io/v1",
"kind": "SelfSubjectReview",
"metadata": {},
}),
),
);
let err = result.unwrap_err();
assert!(
matches!(err, K8sClientError::ApiRejected(ref msg)
if msg.contains("no user identity")),
"got {err:?}"
);
}
fn ssar_response(allowed: bool, reason: Option<&str>) -> Value {
let mut status = json!({"allowed": allowed});
if let Some(reason) = reason {
status["reason"] = json!(reason);
}
json!({
"apiVersion": "authorization.k8s.io/v1",
"kind": "SelfSubjectAccessReview",
"metadata": {},
"spec": {},
"status": status,
})
}
#[tokio::test]
async fn review_access_sends_one_ssar_per_operation_in_order() {
let (client, mut handle) = mock_client();
let validator = KubeValidatorClient::new(client);
let operations = [
K8sOperation {
group: "apps",
resource: "deployments",
verb: "create",
},
K8sOperation {
group: "",
resource: "services",
verb: "delete",
},
];
let respond_both = async {
let first = respond_json(&mut handle, 201, ssar_response(true, None)).await;
let second =
respond_json(&mut handle, 201, ssar_response(false, Some("RBAC: no"))).await;
(first, second)
};
let (result, (first, second)) = tokio::join!(
validator.review_access("gtc-zain", &operations),
respond_both
);
let decisions = result.unwrap();
assert_eq!(decisions.len(), 2);
assert_eq!(decisions[0].operation, operations[0]);
assert_eq!(decisions[0].decision, AccessDecision::Allowed);
assert_eq!(decisions[1].operation, operations[1]);
assert_eq!(
decisions[1].decision,
AccessDecision::Denied("RBAC: no".to_string())
);
for request in [&first, &second] {
assert_eq!(request.method(), http::Method::POST);
assert_eq!(
request.uri().path(),
"/apis/authorization.k8s.io/v1/selfsubjectaccessreviews"
);
}
let first_body = request_body_json(first).await;
assert_eq!(
first_body["spec"]["resourceAttributes"],
json!({
"namespace": "gtc-zain",
"group": "apps",
"resource": "deployments",
"verb": "create",
}),
"the SSAR must probe the exact declared operation"
);
let second_body = request_body_json(second).await;
assert_eq!(
second_body["spec"]["resourceAttributes"]["group"],
json!("")
);
assert_eq!(
second_body["spec"]["resourceAttributes"]["verb"],
json!("delete")
);
}
#[tokio::test]
async fn review_access_without_status_fails_closed() {
let (client, mut handle) = mock_client();
let validator = KubeValidatorClient::new(client);
let operations = [K8sOperation {
group: "apps",
resource: "deployments",
verb: "get",
}];
let (result, _request) = tokio::join!(
validator.review_access("gtc-zain", &operations),
respond_json(
&mut handle,
201,
json!({
"apiVersion": "authorization.k8s.io/v1",
"kind": "SelfSubjectAccessReview",
"metadata": {},
"spec": {},
}),
),
);
let err = result.unwrap_err();
assert!(
matches!(err, K8sClientError::ApiRejected(ref msg) if msg.contains("no status")),
"a status-less review must never authorize, got {err:?}"
);
}
#[tokio::test]
async fn review_access_denied_without_reason_gets_a_placeholder() {
let (client, mut handle) = mock_client();
let validator = KubeValidatorClient::new(client);
let operations = [K8sOperation {
group: "",
resource: "configmaps",
verb: "patch",
}];
let (result, _request) = tokio::join!(
validator.review_access("gtc-zain", &operations),
respond_json(&mut handle, 201, ssar_response(false, None)),
);
let decisions = result.unwrap();
assert_eq!(
decisions[0].decision,
AccessDecision::Denied("no reason supplied".to_string())
);
}
#[test]
fn apply_bound_token_sets_token_when_some() {
let mut cfg = kube::Config::new("https://example.invalid/".parse().unwrap());
assert!(cfg.auth_info.token.is_none());
apply_bound_token(&mut cfg, Some("tok"));
assert!(cfg.auth_info.token.is_some());
}
#[test]
fn apply_bound_token_leaves_none_when_none() {
let mut cfg = kube::Config::new("https://example.invalid/".parse().unwrap());
apply_bound_token(&mut cfg, None);
assert!(cfg.auth_info.token.is_none());
}
#[tokio::test]
async fn apply_rejects_a_foreign_owned_object() {
let (client, mut handle) = mock_client();
let cluster = KubeCluster::new(client);
let manifest = deployment_manifest();
let existing = json!({
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": "gtc-worker-a",
"namespace": "gtc-zain",
"labels": {"greentic.ai/env": "other-env"},
},
});
let respond = async {
respond_json(&mut handle, 200, existing).await
};
let (result, _get_request) = tokio::join!(cluster.apply(&manifest), respond);
let err = result.unwrap_err();
assert!(
matches!(
err,
K8sClusterError::OwnershipConflict {
ref existing_env,
ref incoming_env,
..
} if existing_env == "other-env" && incoming_env == "gtc-zain"
),
"expected OwnershipConflict, got {err:?}"
);
}
#[tokio::test]
async fn apply_proceeds_when_existing_object_is_same_env() {
let (client, mut handle) = mock_client();
let cluster = KubeCluster::new(client);
let manifest = deployment_manifest();
let existing = json!({
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": "gtc-worker-a",
"namespace": "gtc-zain",
"labels": {"greentic.ai/env": "gtc-zain"},
},
});
let respond = async {
let _get = respond_json(&mut handle, 200, existing).await;
respond_json(&mut handle, 200, manifest.clone()).await
};
let (result, patch_request) = tokio::join!(cluster.apply(&manifest), respond);
result.unwrap();
assert_eq!(patch_request.method(), http::Method::PATCH);
}
}