use std::time::Duration;
use async_trait::async_trait;
use greentic_deploy_spec::{DeploymentId, Environment, Revision, RevisionId};
use serde_json::Value;
use tokio::time::{Instant, sleep};
use super::K8sDeployerHandler;
use super::cluster::{K8sCluster, K8sClusterError, ObjectRef};
use super::manifests::{
K8sParams, has_cluster_presence, render_runtime_config_map, render_worker_manifests,
};
use crate::env_packs::deployer::{
ArchiveOutcome, Deployer, DeployerError, DrainOutcome, StageOutcome, TrafficSplitOutcome,
WarmOutcome, enforce_split_invariants, require_revision,
};
use crate::env_packs::render::ManifestRenderer;
fn provider(err: K8sClusterError) -> DeployerError {
DeployerError::Provider(err.to_string())
}
fn params_from_answers(
env: &Environment,
answers: Option<&Value>,
) -> Result<K8sParams, DeployerError> {
K8sParams::from_answers(env, answers)
.map_err(|e| DeployerError::Provider(format!("invalid answers: {e}")))
}
const WARM_ROLLOUT_TIMEOUT: Duration = Duration::from_secs(300);
const WARM_ROLLOUT_TIMEOUT_ENV: &str = "GREENTIC_K8S_WARM_READY_TIMEOUT_SECS";
const WARM_ROLLOUT_POLL_INTERVAL: Duration = Duration::from_secs(2);
fn warm_rollout_timeout() -> Duration {
std::env::var(WARM_ROLLOUT_TIMEOUT_ENV)
.ok()
.and_then(|v| v.trim().parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or(WARM_ROLLOUT_TIMEOUT)
}
async fn wait_for_worker_rollout(
cluster: &dyn K8sCluster,
deployment: &ObjectRef,
desired_replicas: i32,
timeout: Duration,
poll_interval: Duration,
) -> Result<(), DeployerError> {
let deadline = Instant::now() + timeout;
loop {
let status = cluster
.get_rollout_status(deployment)
.await
.map_err(provider)?;
if status.is_complete(desired_replicas) {
return Ok(());
}
if Instant::now() >= deadline {
return Err(DeployerError::Provider(format!(
"worker `{deployment}` did not become ready within {}s \
(observedGeneration {:?}/{}, updatedReplicas {}/{}, \
availableReplicas {}/{}, lingering old replicas {})",
timeout.as_secs(),
status.observed_generation,
status.generation,
status.updated_replicas,
desired_replicas,
status.available_replicas,
desired_replicas,
(status.replicas - status.updated_replicas).max(0),
)));
}
sleep(poll_interval).await;
}
}
impl K8sDeployerHandler {
fn revision(env: &Environment, revision_id: RevisionId) -> Option<&Revision> {
env.revisions.iter().find(|r| r.revision_id == revision_id)
}
async fn apply_all(&self, manifests: &[Value]) -> Result<(), DeployerError> {
for manifest in manifests {
self.cluster.apply(manifest).await.map_err(provider)?;
}
Ok(())
}
pub async fn reconcile(
&self,
env: &Environment,
answers: Option<&Value>,
) -> Result<ReconcileReport, DeployerError> {
let desired = self
.render_environment(env, answers)
.map_err(|e| DeployerError::Provider(e.to_string()))?;
let mut applied = Vec::with_capacity(desired.len());
for manifest in &desired {
self.cluster.apply(manifest).await.map_err(provider)?;
applied.push(ObjectRef::from_manifest(manifest).map_err(provider)?);
}
let params = params_from_answers(env, answers)?;
let mut pruned = Vec::new();
for revision in &env.revisions {
if !has_cluster_presence(revision.lifecycle) {
for manifest in render_worker_manifests(env, revision, ¶ms) {
let object = ObjectRef::from_manifest(&manifest).map_err(provider)?;
self.cluster.delete(&object).await.map_err(provider)?;
pruned.push(object);
}
}
}
Ok(ReconcileReport { applied, pruned })
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize)]
pub struct ReconcileReport {
pub applied: Vec<ObjectRef>,
pub pruned: Vec<ObjectRef>,
}
#[async_trait]
impl Deployer for K8sDeployerHandler {
async fn stage_revision(
&self,
env: &Environment,
revision_id: RevisionId,
) -> Result<StageOutcome, DeployerError> {
require_revision(env, revision_id)?;
Ok(StageOutcome::default())
}
async fn warm_revision(
&self,
env: &Environment,
revision_id: RevisionId,
answers: Option<&Value>,
) -> Result<WarmOutcome, DeployerError> {
require_revision(env, revision_id)?;
let revision = Self::revision(env, revision_id).expect("require_revision passed");
let params = params_from_answers(env, answers)?;
let manifests = render_worker_manifests(env, revision, ¶ms);
self.apply_all(&manifests).await?;
let deployment = &manifests[0];
let desired_replicas = deployment
.pointer("/spec/replicas")
.and_then(Value::as_i64)
.unwrap_or(1) as i32;
let deployment_ref = ObjectRef::from_manifest(deployment).map_err(provider)?;
wait_for_worker_rollout(
self.cluster.as_ref(),
&deployment_ref,
desired_replicas,
warm_rollout_timeout(),
WARM_ROLLOUT_POLL_INTERVAL,
)
.await?;
Ok(WarmOutcome::default())
}
async fn drain_revision(
&self,
env: &Environment,
revision_id: RevisionId,
) -> Result<DrainOutcome, DeployerError> {
require_revision(env, revision_id)?;
Ok(DrainOutcome::default())
}
async fn archive_revision(
&self,
env: &Environment,
revision_id: RevisionId,
answers: Option<&Value>,
) -> Result<ArchiveOutcome, DeployerError> {
require_revision(env, revision_id)?;
let revision = Self::revision(env, revision_id).expect("require_revision passed");
let params = params_from_answers(env, answers)?;
for manifest in render_worker_manifests(env, revision, ¶ms) {
let object = ObjectRef::from_manifest(&manifest).map_err(provider)?;
self.cluster.delete(&object).await.map_err(provider)?;
}
Ok(ArchiveOutcome::default())
}
async fn apply_traffic_split(
&self,
env: &Environment,
deployment_id: DeploymentId,
answers: Option<&Value>,
) -> Result<TrafficSplitOutcome, DeployerError> {
let outcome = enforce_split_invariants(env, deployment_id)?;
let params = params_from_answers(env, answers)?;
self.cluster
.apply(&render_runtime_config_map(env, ¶ms))
.await
.map_err(provider)?;
Ok(outcome)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::env_packs::deployer::conformance::build_fixture_env;
use crate::env_packs::deployer::run_conformance;
use crate::env_packs::k8s::cluster::{InMemoryCluster, K8sCluster, RolloutStatus};
use crate::env_packs::k8s::manifests::{RUNTIME_CONFIG_MAP_NAME, worker_name};
fn handler_with_fake() -> (K8sDeployerHandler, Arc<InMemoryCluster>) {
let cluster = Arc::new(InMemoryCluster::default());
(K8sDeployerHandler::with_cluster(cluster.clone()), cluster)
}
#[derive(Debug)]
struct ScriptedRolloutCluster {
ready_after: usize,
polls: std::sync::atomic::AtomicUsize,
}
#[async_trait]
impl K8sCluster for ScriptedRolloutCluster {
async fn apply(&self, _manifest: &Value) -> Result<(), K8sClusterError> {
Ok(())
}
async fn delete(&self, _object: &ObjectRef) -> Result<(), K8sClusterError> {
Ok(())
}
async fn get_rollout_status(
&self,
_deployment: &ObjectRef,
) -> Result<RolloutStatus, K8sClusterError> {
let n = self.polls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let available = if n >= self.ready_after { 1 } else { 0 };
Ok(RolloutStatus {
generation: 1,
observed_generation: Some(1),
replicas: 1,
updated_replicas: 1,
available_replicas: available,
})
}
}
fn worker_deployment_ref() -> ObjectRef {
ObjectRef {
api_version: "apps/v1".into(),
kind: "Deployment".into(),
namespace: Some("gtc-local".into()),
name: "gtc-worker-x".into(),
}
}
#[tokio::test(start_paused = true)]
async fn warm_rollout_wait_resolves_once_the_worker_becomes_available() {
let cluster = ScriptedRolloutCluster {
ready_after: 3,
polls: std::sync::atomic::AtomicUsize::new(0),
};
wait_for_worker_rollout(
&cluster,
&worker_deployment_ref(),
1,
Duration::from_secs(60),
Duration::from_secs(2),
)
.await
.expect("rollout completes once the replica is available");
assert!(
cluster.polls.load(std::sync::atomic::Ordering::SeqCst) >= 4,
"must keep polling until the worker reports available"
);
}
#[tokio::test(start_paused = true)]
async fn warm_rollout_wait_fails_when_the_worker_never_becomes_ready() {
let cluster = ScriptedRolloutCluster {
ready_after: usize::MAX,
polls: std::sync::atomic::AtomicUsize::new(0),
};
let err = wait_for_worker_rollout(
&cluster,
&worker_deployment_ref(),
1,
Duration::from_secs(10),
Duration::from_secs(2),
)
.await
.unwrap_err();
match err {
DeployerError::Provider(msg) => {
assert!(msg.contains("did not become ready"), "msg: {msg}");
assert!(msg.contains("availableReplicas 0/1"), "msg: {msg}");
}
other => panic!("expected a Provider timeout error, got {other:?}"),
}
}
#[tokio::test]
async fn k8s_deployer_passes_conformance() {
let (handler, _cluster) = handler_with_fake();
run_conformance(&handler)
.await
.expect("K8s deployer satisfies the Phase D conformance contract");
}
#[tokio::test]
async fn warm_applies_the_worker_deployment_and_service() {
let (handler, cluster) = handler_with_fake();
let env = build_fixture_env();
let rev = &env.revisions[0];
handler
.warm_revision(&env, rev.revision_id, None)
.await
.unwrap();
let objects = cluster.objects();
assert_eq!(objects.len(), 2, "Deployment + Service");
let name = worker_name(rev);
let kinds: Vec<(String, String)> = objects
.keys()
.map(|o| (o.kind.clone(), o.name.clone()))
.collect();
assert!(kinds.contains(&("Deployment".into(), name.clone())));
assert!(kinds.contains(&("Service".into(), name)));
handler
.warm_revision(&env, rev.revision_id, None)
.await
.unwrap();
assert_eq!(cluster.objects().len(), 2);
}
#[tokio::test]
async fn warm_honors_the_namespace_answer() {
let (handler, cluster) = handler_with_fake();
let env = build_fixture_env();
let rev = &env.revisions[0];
let answers = serde_json::json!({ "namespace": "custom-ns" });
handler
.warm_revision(&env, rev.revision_id, Some(&answers))
.await
.unwrap();
let namespaces: Vec<String> = cluster
.objects()
.keys()
.filter_map(|o| o.namespace.clone())
.collect();
assert!(
!namespaces.is_empty() && namespaces.iter().all(|ns| ns == "custom-ns"),
"worker objects land in the answer namespace, got {namespaces:?}"
);
}
#[tokio::test]
async fn warm_rejects_invalid_answers_before_touching_the_cluster() {
let (handler, cluster) = handler_with_fake();
let env = build_fixture_env();
let rev = &env.revisions[0];
let answers = serde_json::json!({ "unknown_key": "x" });
let err = handler
.warm_revision(&env, rev.revision_id, Some(&answers))
.await
.unwrap_err();
match err {
DeployerError::Provider(msg) => assert!(msg.contains("invalid answers"), "msg: {msg}"),
other => panic!("expected Provider, got {other:?}"),
}
assert!(
cluster.objects().is_empty(),
"invalid answers must not touch the cluster"
);
}
#[tokio::test]
async fn archive_removes_the_worker_objects_and_tolerates_absence() {
let (handler, cluster) = handler_with_fake();
let env = build_fixture_env();
let rev = &env.revisions[0];
handler
.warm_revision(&env, rev.revision_id, None)
.await
.unwrap();
assert_eq!(cluster.objects().len(), 2);
handler
.archive_revision(&env, rev.revision_id, None)
.await
.unwrap();
assert!(cluster.objects().is_empty());
handler
.archive_revision(&env, rev.revision_id, None)
.await
.unwrap();
}
#[tokio::test]
async fn traffic_split_upserts_the_runtime_config_map() {
let (handler, cluster) = handler_with_fake();
let env = build_fixture_env();
let dep = env.bundles[0].deployment_id;
let outcome = handler.apply_traffic_split(&env, dep, None).await.unwrap();
assert_eq!(outcome.applied_deployment_id, dep);
let objects = cluster.objects();
assert_eq!(objects.len(), 1);
let (object, manifest) = objects.iter().next().unwrap();
assert_eq!(object.kind, "ConfigMap");
assert_eq!(object.name, RUNTIME_CONFIG_MAP_NAME);
let payload = manifest["data"]["runtime-config.json"].as_str().unwrap();
let expected = serde_json::to_string(
&crate::environment::runtime_config::materialize_runtime_config(&env),
)
.unwrap();
assert_eq!(payload, expected);
}
#[tokio::test]
async fn reconcile_applies_desired_state_and_is_idempotent() {
let (handler, cluster) = handler_with_fake();
let env = build_fixture_env();
let report = handler.reconcile(&env, None).await.unwrap();
let desired = handler.render_environment(&env, None).unwrap();
assert_eq!(report.applied.len(), desired.len());
assert_eq!(cluster.objects().len(), desired.len());
let absent = env
.revisions
.iter()
.filter(|r| !has_cluster_presence(r.lifecycle))
.count();
assert_eq!(report.pruned.len(), absent * 2);
let before = cluster.objects();
let report2 = handler.reconcile(&env, None).await.unwrap();
assert_eq!(report2.applied.len(), desired.len());
assert_eq!(cluster.objects(), before, "reconcile is idempotent");
}
#[tokio::test]
async fn reconcile_prunes_workers_left_over_from_a_now_absent_revision() {
let (handler, cluster) = handler_with_fake();
let env = build_fixture_env();
let params = K8sParams::for_env(&env);
let absent = env
.revisions
.iter()
.find(|r| !has_cluster_presence(r.lifecycle))
.expect("fixture has a non-present revision");
for manifest in render_worker_manifests(&env, absent, ¶ms) {
cluster.apply(&manifest).await.unwrap();
}
let lingering = worker_name(absent);
assert!(
cluster.objects().keys().any(|o| o.name == lingering),
"precondition: the absent revision's workers are on the cluster"
);
handler.reconcile(&env, None).await.unwrap();
assert!(
!cluster.objects().keys().any(|o| o.name == lingering),
"reconcile prunes the now-absent revision's workers"
);
}
#[tokio::test]
async fn preconditions_reject_before_any_cluster_call() {
let (handler, cluster) = handler_with_fake();
let mut env = build_fixture_env();
let unknown = RevisionId(ulid::Ulid::from(0xFFFF_u128));
let err = handler
.warm_revision(&env, unknown, None)
.await
.unwrap_err();
assert!(matches!(err, DeployerError::RevisionNotFound { .. }));
env.traffic_splits[0].entries[0].weight_bps = 1;
let dep = env.bundles[0].deployment_id;
let err = handler
.apply_traffic_split(&env, dep, None)
.await
.unwrap_err();
assert!(matches!(err, DeployerError::InvalidSplit { .. }));
assert!(
cluster.objects().is_empty(),
"rejected preconditions must not touch the cluster"
);
}
#[tokio::test]
async fn unconfigured_cluster_surfaces_a_provider_error() {
let handler = K8sDeployerHandler::default();
let env = build_fixture_env();
let err = handler
.warm_revision(&env, env.revisions[0].revision_id, None)
.await
.unwrap_err();
match err {
DeployerError::Provider(msg) => {
assert!(msg.contains("no Kubernetes API client"), "msg: {msg}");
}
other => panic!("expected Provider, got {other:?}"),
}
let unknown = RevisionId(ulid::Ulid::from(0xFFFF_u128));
assert!(matches!(
handler
.warm_revision(&env, unknown, None)
.await
.unwrap_err(),
DeployerError::RevisionNotFound { .. }
));
}
}