use async_trait::async_trait;
use greentic_deploy_spec::{DeploymentId, Environment, Revision, RevisionId};
use serde_json::Value;
use super::K8sDeployerHandler;
use super::cluster::{K8sClusterError, ObjectRef};
use super::manifests::{K8sParams, render_runtime_config_map, render_worker_manifests};
use crate::env_packs::deployer::{
ArchiveOutcome, Deployer, DeployerError, DrainOutcome, StageOutcome, TrafficSplitOutcome,
WarmOutcome, enforce_split_invariants, require_revision,
};
fn provider(err: K8sClusterError) -> DeployerError {
DeployerError::Provider(err.to_string())
}
impl K8sDeployerHandler {
fn revision(env: &Environment, revision_id: RevisionId) -> Option<&Revision> {
env.revisions.iter().find(|r| r.revision_id == revision_id)
}
async fn apply_all(&self, manifests: &[Value]) -> Result<(), DeployerError> {
for manifest in manifests {
self.cluster.apply(manifest).await.map_err(provider)?;
}
Ok(())
}
}
#[async_trait]
impl Deployer for K8sDeployerHandler {
async fn stage_revision(
&self,
env: &Environment,
revision_id: RevisionId,
) -> Result<StageOutcome, DeployerError> {
require_revision(env, revision_id)?;
Ok(StageOutcome::default())
}
async fn warm_revision(
&self,
env: &Environment,
revision_id: RevisionId,
) -> Result<WarmOutcome, DeployerError> {
require_revision(env, revision_id)?;
let revision = Self::revision(env, revision_id).expect("require_revision passed");
let params = K8sParams::for_env(env);
self.apply_all(&render_worker_manifests(env, revision, ¶ms))
.await?;
Ok(WarmOutcome::default())
}
async fn drain_revision(
&self,
env: &Environment,
revision_id: RevisionId,
) -> Result<DrainOutcome, DeployerError> {
require_revision(env, revision_id)?;
Ok(DrainOutcome::default())
}
async fn archive_revision(
&self,
env: &Environment,
revision_id: RevisionId,
) -> Result<ArchiveOutcome, DeployerError> {
require_revision(env, revision_id)?;
let revision = Self::revision(env, revision_id).expect("require_revision passed");
let params = K8sParams::for_env(env);
for manifest in render_worker_manifests(env, revision, ¶ms) {
let object = ObjectRef::from_manifest(&manifest).map_err(provider)?;
self.cluster.delete(&object).await.map_err(provider)?;
}
Ok(ArchiveOutcome::default())
}
async fn apply_traffic_split(
&self,
env: &Environment,
deployment_id: DeploymentId,
) -> Result<TrafficSplitOutcome, DeployerError> {
let outcome = enforce_split_invariants(env, deployment_id)?;
let params = K8sParams::for_env(env);
self.cluster
.apply(&render_runtime_config_map(env, ¶ms))
.await
.map_err(provider)?;
Ok(outcome)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::env_packs::deployer::conformance::build_fixture_env;
use crate::env_packs::deployer::run_conformance;
use crate::env_packs::k8s::cluster::InMemoryCluster;
use crate::env_packs::k8s::manifests::{RUNTIME_CONFIG_MAP_NAME, worker_name};
fn handler_with_fake() -> (K8sDeployerHandler, Arc<InMemoryCluster>) {
let cluster = Arc::new(InMemoryCluster::default());
(K8sDeployerHandler::with_cluster(cluster.clone()), cluster)
}
#[tokio::test]
async fn k8s_deployer_passes_conformance() {
let (handler, _cluster) = handler_with_fake();
run_conformance(&handler)
.await
.expect("K8s deployer satisfies the Phase D conformance contract");
}
#[tokio::test]
async fn warm_applies_the_worker_deployment_and_service() {
let (handler, cluster) = handler_with_fake();
let env = build_fixture_env();
let rev = &env.revisions[0];
handler.warm_revision(&env, rev.revision_id).await.unwrap();
let objects = cluster.objects();
assert_eq!(objects.len(), 2, "Deployment + Service");
let name = worker_name(rev);
let kinds: Vec<(String, String)> = objects
.keys()
.map(|o| (o.kind.clone(), o.name.clone()))
.collect();
assert!(kinds.contains(&("Deployment".into(), name.clone())));
assert!(kinds.contains(&("Service".into(), name)));
handler.warm_revision(&env, rev.revision_id).await.unwrap();
assert_eq!(cluster.objects().len(), 2);
}
#[tokio::test]
async fn archive_removes_the_worker_objects_and_tolerates_absence() {
let (handler, cluster) = handler_with_fake();
let env = build_fixture_env();
let rev = &env.revisions[0];
handler.warm_revision(&env, rev.revision_id).await.unwrap();
assert_eq!(cluster.objects().len(), 2);
handler
.archive_revision(&env, rev.revision_id)
.await
.unwrap();
assert!(cluster.objects().is_empty());
handler
.archive_revision(&env, rev.revision_id)
.await
.unwrap();
}
#[tokio::test]
async fn traffic_split_upserts_the_runtime_config_map() {
let (handler, cluster) = handler_with_fake();
let env = build_fixture_env();
let dep = env.bundles[0].deployment_id;
let outcome = handler.apply_traffic_split(&env, dep).await.unwrap();
assert_eq!(outcome.applied_deployment_id, dep);
let objects = cluster.objects();
assert_eq!(objects.len(), 1);
let (object, manifest) = objects.iter().next().unwrap();
assert_eq!(object.kind, "ConfigMap");
assert_eq!(object.name, RUNTIME_CONFIG_MAP_NAME);
let payload = manifest["data"]["runtime-config.json"].as_str().unwrap();
let expected = serde_json::to_string(
&crate::environment::runtime_config::materialize_runtime_config(&env),
)
.unwrap();
assert_eq!(payload, expected);
}
#[tokio::test]
async fn preconditions_reject_before_any_cluster_call() {
let (handler, cluster) = handler_with_fake();
let mut env = build_fixture_env();
let unknown = RevisionId(ulid::Ulid::from(0xFFFF_u128));
let err = handler.warm_revision(&env, unknown).await.unwrap_err();
assert!(matches!(err, DeployerError::RevisionNotFound { .. }));
env.traffic_splits[0].entries[0].weight_bps = 1;
let dep = env.bundles[0].deployment_id;
let err = handler.apply_traffic_split(&env, dep).await.unwrap_err();
assert!(matches!(err, DeployerError::InvalidSplit { .. }));
assert!(
cluster.objects().is_empty(),
"rejected preconditions must not touch the cluster"
);
}
#[tokio::test]
async fn unconfigured_cluster_surfaces_a_provider_error() {
let handler = K8sDeployerHandler::default();
let env = build_fixture_env();
let err = handler
.warm_revision(&env, env.revisions[0].revision_id)
.await
.unwrap_err();
match err {
DeployerError::Provider(msg) => {
assert!(msg.contains("no Kubernetes API client"), "msg: {msg}");
}
other => panic!("expected Provider, got {other:?}"),
}
let unknown = RevisionId(ulid::Ulid::from(0xFFFF_u128));
assert!(matches!(
handler.warm_revision(&env, unknown).await.unwrap_err(),
DeployerError::RevisionNotFound { .. }
));
}
}