greentic-deployer-dev 1.1.27501952916

Greentic deployer runtime for plan construction and deployment-pack dispatch
Documentation
//! [`Deployer`] impl for the K8s env-pack.
//!
//! Verbs follow the contract's required order: pure-spec preconditions
//! first (shared helpers — `require_revision`, `enforce_split_invariants`),
//! provider work second. The provider work is "render the deterministic
//! desired state, hand it to the [`K8sCluster`](super::cluster::K8sCluster)
//! seam".
//!
//! **Note:** the Deployer verbs use [`K8sParams::for_env`] sandbox
//! defaults — they have no env-dir access on the trait, so the binding's
//! `answers_ref` is not available here. Threading answers into
//! `warm_revision` / `apply_traffic_split` rides the PR-5.3 orchestration
//! wiring. `op env render` already consumes answers via
//! [`K8sParams::from_answers`].
//!
//! | Verb | Provider side-effect |
//! |---|---|
//! | `stage_revision` | None today. The bundle artifact is delivered to the pod at warm time (delivery mechanism is the apply PR's decision); there is no per-revision registry upload step yet. |
//! | `warm_revision` | Apply the revision's worker Deployment + ClusterIP Service. |
//! | `drain_revision` | None — drain semantics are routing-side. The router stops dispatching NEW sessions when the `TrafficSplit` changes (`apply_traffic_split`); provider resources stay up through the drain window so in-flight sessions finish. Teardown is `archive_revision`'s job. |
//! | `archive_revision` | Delete the worker Deployment + Service (idempotent against absent). |
//! | `apply_traffic_split` | Upsert the runtime-config ConfigMap — the router reloads it and enforces the split in-process. Never a `kubectl rollout`. |
//!
//! Idempotency falls out of the seam's contract: `apply` is a
//! declarative upsert, `delete` of an absent object is `Ok`. The
//! conformance bench runs against an in-memory fake cluster (the verbs +
//! rendering are fully exercised); the real kube-rs-backed seam lands in
//! the K8s apply PR and inherits the same verbs unchanged.

use async_trait::async_trait;
use greentic_deploy_spec::{DeploymentId, Environment, Revision, RevisionId};
use serde_json::Value;

use super::K8sDeployerHandler;
use super::cluster::{K8sClusterError, ObjectRef};
use super::manifests::{K8sParams, render_runtime_config_map, render_worker_manifests};
use crate::env_packs::deployer::{
    ArchiveOutcome, Deployer, DeployerError, DrainOutcome, StageOutcome, TrafficSplitOutcome,
    WarmOutcome, enforce_split_invariants, require_revision,
};

/// Cluster failures surface as provider failures — the verb's
/// preconditions have already passed by the time the seam is touched.
fn provider(err: K8sClusterError) -> DeployerError {
    DeployerError::Provider(err.to_string())
}

impl K8sDeployerHandler {
    /// Locate the revision (the caller already passed `require_revision`,
    /// so the lookup is infallible by construction — keep it total anyway).
    fn revision(env: &Environment, revision_id: RevisionId) -> Option<&Revision> {
        env.revisions.iter().find(|r| r.revision_id == revision_id)
    }

    async fn apply_all(&self, manifests: &[Value]) -> Result<(), DeployerError> {
        for manifest in manifests {
            self.cluster.apply(manifest).await.map_err(provider)?;
        }
        Ok(())
    }
}

#[async_trait]
impl Deployer for K8sDeployerHandler {
    async fn stage_revision(
        &self,
        env: &Environment,
        revision_id: RevisionId,
    ) -> Result<StageOutcome, DeployerError> {
        require_revision(env, revision_id)?;
        // No cluster work at stage time — see the module table.
        Ok(StageOutcome::default())
    }

    async fn warm_revision(
        &self,
        env: &Environment,
        revision_id: RevisionId,
    ) -> Result<WarmOutcome, DeployerError> {
        require_revision(env, revision_id)?;
        let revision = Self::revision(env, revision_id).expect("require_revision passed");
        let params = K8sParams::for_env(env);
        self.apply_all(&render_worker_manifests(env, revision, &params))
            .await?;
        Ok(WarmOutcome::default())
    }

    async fn drain_revision(
        &self,
        env: &Environment,
        revision_id: RevisionId,
    ) -> Result<DrainOutcome, DeployerError> {
        require_revision(env, revision_id)?;
        // Routing-side only — see the module table. Worker resources stay
        // up so in-flight sessions complete; archive tears them down.
        Ok(DrainOutcome::default())
    }

    async fn archive_revision(
        &self,
        env: &Environment,
        revision_id: RevisionId,
    ) -> Result<ArchiveOutcome, DeployerError> {
        require_revision(env, revision_id)?;
        let revision = Self::revision(env, revision_id).expect("require_revision passed");
        let params = K8sParams::for_env(env);
        for manifest in render_worker_manifests(env, revision, &params) {
            let object = ObjectRef::from_manifest(&manifest).map_err(provider)?;
            self.cluster.delete(&object).await.map_err(provider)?;
        }
        Ok(ArchiveOutcome::default())
    }

    async fn apply_traffic_split(
        &self,
        env: &Environment,
        deployment_id: DeploymentId,
    ) -> Result<TrafficSplitOutcome, DeployerError> {
        // Preconditions + outcome construction BEFORE any cluster call.
        let outcome = enforce_split_invariants(env, deployment_id)?;
        let params = K8sParams::for_env(env);
        self.cluster
            .apply(&render_runtime_config_map(env, &params))
            .await
            .map_err(provider)?;
        Ok(outcome)
    }
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use super::*;
    use crate::env_packs::deployer::conformance::build_fixture_env;
    use crate::env_packs::deployer::run_conformance;
    use crate::env_packs::k8s::cluster::InMemoryCluster;
    use crate::env_packs::k8s::manifests::{RUNTIME_CONFIG_MAP_NAME, worker_name};

    fn handler_with_fake() -> (K8sDeployerHandler, Arc<InMemoryCluster>) {
        let cluster = Arc::new(InMemoryCluster::default());
        (K8sDeployerHandler::with_cluster(cluster.clone()), cluster)
    }

    /// The Phase D entry gate: the K8s impl satisfies the shared
    /// deployer contract (idempotency on every verb, typed precondition
    /// rejection, cross-deployment independence, projection consistency).
    #[tokio::test]
    async fn k8s_deployer_passes_conformance() {
        let (handler, _cluster) = handler_with_fake();
        run_conformance(&handler)
            .await
            .expect("K8s deployer satisfies the Phase D conformance contract");
    }

    #[tokio::test]
    async fn warm_applies_the_worker_deployment_and_service() {
        let (handler, cluster) = handler_with_fake();
        let env = build_fixture_env();
        let rev = &env.revisions[0];

        handler.warm_revision(&env, rev.revision_id).await.unwrap();

        let objects = cluster.objects();
        assert_eq!(objects.len(), 2, "Deployment + Service");
        let name = worker_name(rev);
        let kinds: Vec<(String, String)> = objects
            .keys()
            .map(|o| (o.kind.clone(), o.name.clone()))
            .collect();
        assert!(kinds.contains(&("Deployment".into(), name.clone())));
        assert!(kinds.contains(&("Service".into(), name)));

        // Warm again: declarative upsert, still exactly two objects.
        handler.warm_revision(&env, rev.revision_id).await.unwrap();
        assert_eq!(cluster.objects().len(), 2);
    }

    #[tokio::test]
    async fn archive_removes_the_worker_objects_and_tolerates_absence() {
        let (handler, cluster) = handler_with_fake();
        let env = build_fixture_env();
        let rev = &env.revisions[0];

        handler.warm_revision(&env, rev.revision_id).await.unwrap();
        assert_eq!(cluster.objects().len(), 2);

        handler
            .archive_revision(&env, rev.revision_id)
            .await
            .unwrap();
        assert!(cluster.objects().is_empty());

        // Retried archive against already-torn-down resources is Ok.
        handler
            .archive_revision(&env, rev.revision_id)
            .await
            .unwrap();
    }

    #[tokio::test]
    async fn traffic_split_upserts_the_runtime_config_map() {
        let (handler, cluster) = handler_with_fake();
        let env = build_fixture_env();
        let dep = env.bundles[0].deployment_id;

        let outcome = handler.apply_traffic_split(&env, dep).await.unwrap();
        assert_eq!(outcome.applied_deployment_id, dep);

        let objects = cluster.objects();
        assert_eq!(objects.len(), 1);
        let (object, manifest) = objects.iter().next().unwrap();
        assert_eq!(object.kind, "ConfigMap");
        assert_eq!(object.name, RUNTIME_CONFIG_MAP_NAME);
        // The ConfigMap payload is the exact runtime-config projection.
        let payload = manifest["data"]["runtime-config.json"].as_str().unwrap();
        let expected = serde_json::to_string(
            &crate::environment::runtime_config::materialize_runtime_config(&env),
        )
        .unwrap();
        assert_eq!(payload, expected);
    }

    /// Preconditions run BEFORE any cluster call: an unknown revision or
    /// an invalid split must leave the cluster untouched.
    #[tokio::test]
    async fn preconditions_reject_before_any_cluster_call() {
        let (handler, cluster) = handler_with_fake();
        let mut env = build_fixture_env();
        let unknown = RevisionId(ulid::Ulid::from(0xFFFF_u128));

        let err = handler.warm_revision(&env, unknown).await.unwrap_err();
        assert!(matches!(err, DeployerError::RevisionNotFound { .. }));

        // Invalid split (sum != 10000) on deployment A.
        env.traffic_splits[0].entries[0].weight_bps = 1;
        let dep = env.bundles[0].deployment_id;
        let err = handler.apply_traffic_split(&env, dep).await.unwrap_err();
        assert!(matches!(err, DeployerError::InvalidSplit { .. }));

        assert!(
            cluster.objects().is_empty(),
            "rejected preconditions must not touch the cluster"
        );
    }

    /// The default handler (no cluster client wired) fails provider verbs
    /// honestly instead of pretending the work happened.
    #[tokio::test]
    async fn unconfigured_cluster_surfaces_a_provider_error() {
        let handler = K8sDeployerHandler::default();
        let env = build_fixture_env();
        let err = handler
            .warm_revision(&env, env.revisions[0].revision_id)
            .await
            .unwrap_err();
        match err {
            DeployerError::Provider(msg) => {
                assert!(msg.contains("no Kubernetes API client"), "msg: {msg}");
            }
            other => panic!("expected Provider, got {other:?}"),
        }
        // Pure preconditions still come first even unconfigured.
        let unknown = RevisionId(ulid::Ulid::from(0xFFFF_u128));
        assert!(matches!(
            handler.warm_revision(&env, unknown).await.unwrap_err(),
            DeployerError::RevisionNotFound { .. }
        ));
    }
}