stormchaser-runner-k8s 1.4.2

A robust, distributed workflow engine for event-driven and human-triggered workflows.
Documentation
use crate::job_machine::{JobMetadata, K8sJobSpec};
use anyhow::{Context, Result};
use k8s_openapi::api::core::v1::{
    Container, EnvVar as K8sEnvVar, PodSpec, PodTemplateSpec, ResourceRequirements as K8sResources,
    Volume,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use std::collections::BTreeMap;

use super::*;
use stormchaser_model::dsl;

pub(crate) struct StepSpec {
    pub(crate) image: String,
    pub(crate) command: Option<Vec<String>>,
    pub(crate) args: Option<Vec<String>>,
    pub(crate) env: Vec<K8sEnvVar>,
    pub(crate) resources: K8sResources,
    pub(crate) active_deadline: Option<i64>,
    pub(crate) backoff_limit: Option<i32>,
    pub(crate) completions: Option<i32>,
    pub(crate) parallelism: Option<i32>,
    pub(crate) ttl_seconds_after_finished: Option<i32>,
    pub(crate) privileged: Option<bool>,
    pub(crate) node_selector: Option<BTreeMap<String, String>>,
    pub(crate) service_account_name: Option<String>,
    pub(crate) restart_policy: Option<String>,
    pub(crate) extra_labels: Option<BTreeMap<String, String>>,
    pub(crate) extra_annotations: Option<BTreeMap<String, String>>,
    pub(crate) storage_mounts: Vec<dsl::StorageMount>,
    pub(crate) secret_mounts: Vec<dsl::SecretMount>,
    pub(crate) config_map_mounts: Vec<dsl::ConfigMapMount>,
}

pub(crate) fn build_k8s_pod_spec(
    container: Container,
    init_containers: Vec<Container>,
    volumes: Vec<Volume>,
    metadata: &JobMetadata,
    step_spec: &StepSpec,
) -> PodTemplateSpec {
    let image_pull_secrets = if metadata.registry_auth.is_some() {
        let step_id_prefix: String = metadata.step_id.to_string().chars().take(8).collect();
        let secret_name = format!(
            "storm-{}-{}-auth",
            metadata.step_dsl.name.to_lowercase().replace('_', "-"),
            step_id_prefix
        );
        Some(vec![k8s_openapi::api::core::v1::LocalObjectReference {
            name: secret_name,
        }])
    } else {
        None
    };

    PodTemplateSpec {
        metadata: Some(ObjectMeta {
            labels: Some(BTreeMap::from([
                ("managed-by".to_string(), "stormchaser".to_string()),
                (
                    "stormchaser-run-id".to_string(),
                    metadata.run_id.to_string(),
                ),
                (
                    "stormchaser-step-id".to_string(),
                    metadata.step_id.to_string(),
                ),
                (
                    "stormchaser-fencing-token".to_string(),
                    metadata.fencing_token.to_string(),
                ),
            ])),
            ..Default::default()
        }),
        spec: Some(PodSpec {
            containers: vec![container],
            init_containers: Some(init_containers),
            restart_policy: Some(
                step_spec
                    .restart_policy
                    .clone()
                    .unwrap_or_else(|| "OnFailure".to_string()),
            ),
            volumes: Some(volumes),
            node_selector: step_spec
                .node_selector
                .clone()
                .map(|ns| ns.into_iter().collect()),
            service_account_name: step_spec.service_account_name.clone(),
            image_pull_secrets,
            ..Default::default()
        }),
    }
}

pub(crate) fn parse_step_spec(metadata: &JobMetadata) -> Result<StepSpec> {
    match metadata.step_dsl.r#type.as_str() {
        "RunContainer" => {
            let spec: dsl::CommonContainerSpec =
                serde_json::from_value(metadata.step_dsl.spec.clone())
                    .context("Failed to parse RunContainer spec as CommonContainerSpec")?;

            let env = map_dsl_env_to_k8s(spec.env.unwrap_or_default());

            let resources = build_k8s_resources(spec.cpu, spec.memory);

            Ok(StepSpec {
                image: spec.image,
                command: spec.command,
                args: spec.args,
                env,
                resources,
                active_deadline: None,
                backoff_limit: None,
                completions: None,
                parallelism: None,
                ttl_seconds_after_finished: None,
                privileged: spec.privileged,
                node_selector: None,
                service_account_name: None,
                restart_policy: None,
                extra_labels: None,
                extra_annotations: None,
                storage_mounts: spec.storage_mounts.unwrap_or_default(),
                secret_mounts: Vec::new(),
                config_map_mounts: Vec::new(),
            })
        }
        "RunK8sJob" => {
            let spec: K8sJobSpec = serde_json::from_value(metadata.step_dsl.spec.clone())
                .context("Failed to parse RunK8sJob spec as K8sJobSpec")?;

            let env = map_dsl_env_to_k8s(spec.env.unwrap_or_default());

            Ok(StepSpec {
                image: spec.image,
                command: spec.command,
                args: spec.args,
                env,
                resources: spec.resources.unwrap_or_default(),
                active_deadline: spec.active_deadline_seconds,
                backoff_limit: spec.backoff_limit,
                completions: spec.completions,
                parallelism: spec.parallelism,
                ttl_seconds_after_finished: spec.ttl_seconds_after_finished,
                privileged: spec.privileged,
                node_selector: spec.node_selector,
                service_account_name: spec.service_account_name,
                restart_policy: spec.restart_policy,
                extra_labels: spec.labels,
                extra_annotations: spec.annotations,
                storage_mounts: spec.storage_mounts.unwrap_or_default(),
                secret_mounts: spec.secret_mounts.unwrap_or_default(),
                config_map_mounts: spec.config_map_mounts.unwrap_or_default(),
            })
        }
        _ => {
            let params = &metadata.step_dsl.params;
            let image = params.get("image").map(|s| s.to_string()).context(format!(
                "Missing 'image' for {} step (neither in spec nor params)",
                metadata.step_dsl.r#type
            ))?;

            Ok(StepSpec {
                image,
                command: params.get("command").map(|c| vec![c.to_string()]),
                args: params.get("args").map(|a| vec![a.to_string()]),
                env: Vec::new(),
                resources: K8sResources::default(),
                active_deadline: None,
                backoff_limit: None,
                completions: None,
                parallelism: None,
                ttl_seconds_after_finished: None,
                privileged: None,
                node_selector: None,
                service_account_name: None,
                restart_policy: None,
                extra_labels: None,
                extra_annotations: None,
                storage_mounts: Vec::new(),
                secret_mounts: Vec::new(),
                config_map_mounts: Vec::new(),
            })
        }
    }
}

pub(crate) fn normalize_resource_name(name: &str, prefix: &str) -> String {
    format!("{}-{}", prefix, name.to_lowercase().replace('_', "-"))
}