use std::collections::HashMap;
use async_trait::async_trait;
use k8s_openapi::api::batch::v1::{Job, JobSpec};
use k8s_openapi::api::core::v1::{
Container, EnvVar, EnvVarSource, PodSpec, PodTemplateSpec, ResourceRequirements,
SecretKeySelector,
};
use kube::api::{Api, ObjectMeta, PostParams};
use kube::Client;
use serde::{Deserialize, Serialize};
use crate::context::ExecutionContext;
use crate::error::ToolError;
use crate::registry::{Tool, ToolConfig};
use crate::result::ToolResult;
use crate::template::TemplateEngine;
const DEFAULT_NAMESPACE: &str = "noetl";
const DEFAULT_RESTART_POLICY: &str = "Never";
const DEFAULT_BACKOFF_LIMIT: i32 = 0;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContainerConfig {
pub image: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub command: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<Vec<String>>,
#[serde(default)]
pub env: Vec<ContainerEnvVar>,
#[serde(default)]
pub resources: ContainerResources,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout_seconds: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub service_account: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub backoff_limit: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub restart_policy: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContainerEnvVar {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub value: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub value_from: Option<EnvValueFrom>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnvValueFrom {
pub secret_name: String,
pub secret_key: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ContainerResources {
#[serde(default)]
pub requests: HashMap<String, String>,
#[serde(default)]
pub limits: HashMap<String, String>,
}
pub struct ContainerTool {
template_engine: TemplateEngine,
}
impl Default for ContainerTool {
fn default() -> Self {
Self::new()
}
}
impl ContainerTool {
pub fn new() -> Self {
Self {
template_engine: TemplateEngine::new(),
}
}
fn parse_config(
&self,
config: &ToolConfig,
ctx: &ExecutionContext,
) -> Result<ContainerConfig, ToolError> {
let template_ctx = ctx.to_template_context();
let rendered = self
.template_engine
.render_value(&config.config, &template_ctx)?;
serde_json::from_value(rendered).map_err(|e| {
ToolError::Configuration(format!("Invalid container config: {e}"))
})
}
fn build_job(cfg: &ContainerConfig, ctx: &ExecutionContext) -> Result<Job, ToolError> {
if cfg.image.trim().is_empty() {
return Err(ToolError::Configuration(
"container: image is required".to_string(),
));
}
let mut env_vars = Vec::with_capacity(cfg.env.len());
for ev in &cfg.env {
if ev.value.is_some() && ev.value_from.is_some() {
return Err(ToolError::Configuration(format!(
"container env var '{}': value and value_from are mutually exclusive",
ev.name
)));
}
let env = if let Some(vf) = &ev.value_from {
EnvVar {
name: ev.name.clone(),
value: None,
value_from: Some(EnvVarSource {
secret_key_ref: Some(SecretKeySelector {
name: vf.secret_name.clone(),
key: vf.secret_key.clone(),
optional: Some(false),
}),
..Default::default()
}),
}
} else {
EnvVar {
name: ev.name.clone(),
value: ev.value.clone().or_else(|| Some(String::new())),
value_from: None,
}
};
env_vars.push(env);
}
let resources = if cfg.resources.requests.is_empty()
&& cfg.resources.limits.is_empty()
{
None
} else {
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
let to_qty_map = |m: &HashMap<String, String>| {
m.iter()
.map(|(k, v)| (k.clone(), Quantity(v.clone())))
.collect::<std::collections::BTreeMap<_, _>>()
};
Some(ResourceRequirements {
requests: if cfg.resources.requests.is_empty() {
None
} else {
Some(to_qty_map(&cfg.resources.requests))
},
limits: if cfg.resources.limits.is_empty() {
None
} else {
Some(to_qty_map(&cfg.resources.limits))
},
..Default::default()
})
};
let container = Container {
name: "main".to_string(),
image: Some(cfg.image.clone()),
command: cfg.command.clone(),
args: cfg.args.clone(),
env: if env_vars.is_empty() {
None
} else {
Some(env_vars)
},
resources,
..Default::default()
};
let pod_spec = PodSpec {
containers: vec![container],
restart_policy: Some(
cfg.restart_policy
.clone()
.unwrap_or_else(|| DEFAULT_RESTART_POLICY.to_string()),
),
service_account_name: cfg.service_account.clone(),
..Default::default()
};
let labels = std::collections::BTreeMap::from([
("noetl.execution-id".to_string(), ctx.execution_id.to_string()),
("noetl.step-name".to_string(), ctx.step.clone()),
("noetl.tool-kind".to_string(), "container".to_string()),
]);
let step_slug: String = ctx
.step
.chars()
.filter(|c| c.is_ascii_alphanumeric() || *c == '-')
.take(20)
.collect();
let generate_name = format!(
"noetl-container-{step}-{eid}-",
step = if step_slug.is_empty() {
"step".to_string()
} else {
step_slug.to_lowercase()
},
eid = ctx.execution_id
);
let job = Job {
metadata: ObjectMeta {
generate_name: Some(generate_name),
namespace: Some(
cfg.namespace
.clone()
.unwrap_or_else(|| DEFAULT_NAMESPACE.to_string()),
),
labels: Some(labels.clone()),
..Default::default()
},
spec: Some(JobSpec {
backoff_limit: Some(cfg.backoff_limit.unwrap_or(DEFAULT_BACKOFF_LIMIT)),
active_deadline_seconds: cfg.timeout_seconds,
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(labels),
..Default::default()
}),
spec: Some(pod_spec),
},
..Default::default()
}),
status: None,
};
Ok(job)
}
}
#[async_trait]
impl Tool for ContainerTool {
fn name(&self) -> &'static str {
"container"
}
async fn execute(
&self,
config: &ToolConfig,
ctx: &ExecutionContext,
) -> Result<ToolResult, ToolError> {
let cfg = self.parse_config(config, ctx)?;
let namespace = cfg
.namespace
.clone()
.unwrap_or_else(|| DEFAULT_NAMESPACE.to_string());
let job_spec = Self::build_job(&cfg, ctx)?;
let client = Client::try_default().await.map_err(|e| {
ToolError::ExecutionFailed(format!("container: kube client init failed: {e}"))
})?;
let api: Api<Job> = Api::namespaced(client, &namespace);
tracing::info!(
execution_id = ctx.execution_id,
step = %ctx.step,
namespace = %namespace,
image = %cfg.image,
"container.dispatch"
);
let created = api
.create(&PostParams::default(), &job_spec)
.await
.map_err(|e| {
ToolError::ExecutionFailed(format!(
"container: Job create failed in namespace '{namespace}': {e}"
))
})?;
let job_name = created
.metadata
.name
.clone()
.unwrap_or_default();
let job_uid = created.metadata.uid.clone();
tracing::info!(
execution_id = ctx.execution_id,
step = %ctx.step,
job_name = %job_name,
job_uid = job_uid.as_deref().unwrap_or(""),
"container.dispatched"
);
let mut result = ToolResult::success(serde_json::json!({
"job_name": job_name,
"job_uid": job_uid,
"namespace": namespace,
"labels": {
"noetl.execution-id": ctx.execution_id.to_string(),
"noetl.step-name": ctx.step,
"noetl.tool-kind": "container",
},
}));
result.pending_callback = Some(true);
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ctx() -> ExecutionContext {
ExecutionContext::new(900000000000000001_i64, "train_model", "http://test")
}
fn minimal_config() -> ContainerConfig {
ContainerConfig {
image: "alpine:3.19".to_string(),
command: None,
args: None,
env: Vec::new(),
resources: ContainerResources::default(),
timeout_seconds: None,
service_account: None,
namespace: None,
backoff_limit: None,
restart_policy: None,
}
}
#[test]
fn build_job_sets_required_labels() {
let cfg = minimal_config();
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
let labels = job
.metadata
.labels
.as_ref()
.expect("labels present on metadata");
assert_eq!(
labels.get("noetl.execution-id").map(String::as_str),
Some("900000000000000001")
);
assert_eq!(
labels.get("noetl.step-name").map(String::as_str),
Some("train_model")
);
assert_eq!(
labels.get("noetl.tool-kind").map(String::as_str),
Some("container")
);
}
#[test]
fn build_job_uses_default_namespace() {
let cfg = minimal_config();
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
assert_eq!(job.metadata.namespace.as_deref(), Some("noetl"));
}
#[test]
fn build_job_honours_explicit_namespace() {
let mut cfg = minimal_config();
cfg.namespace = Some("ml-platform".to_string());
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
assert_eq!(job.metadata.namespace.as_deref(), Some("ml-platform"));
}
#[test]
fn build_job_generate_name_includes_step_and_eid() {
let cfg = minimal_config();
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
let gn = job.metadata.generate_name.as_deref().unwrap_or("");
assert!(gn.starts_with("noetl-container-trainmodel-"), "got {gn}");
assert!(gn.ends_with("-"), "generateName should end with hyphen for K8s suffix; got {gn}");
assert!(gn.contains("900000000000000001"), "got {gn}");
}
#[test]
fn build_job_propagates_command_and_args() {
let mut cfg = minimal_config();
cfg.command = Some(vec!["/bin/train".to_string()]);
cfg.args = Some(vec!["--epochs".to_string(), "100".to_string()]);
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
let container = &job.spec.unwrap().template.spec.unwrap().containers[0];
assert_eq!(container.command.as_deref(), Some(&["/bin/train".to_string()][..]));
assert_eq!(
container.args.as_deref(),
Some(&["--epochs".to_string(), "100".to_string()][..])
);
}
#[test]
fn build_job_propagates_env_literal() {
let mut cfg = minimal_config();
cfg.env.push(ContainerEnvVar {
name: "TRAINING_RUN".to_string(),
value: Some("run-42".to_string()),
value_from: None,
});
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
let env = job.spec.unwrap().template.spec.unwrap().containers[0]
.env
.clone()
.expect("env present");
assert_eq!(env.len(), 1);
assert_eq!(env[0].name, "TRAINING_RUN");
assert_eq!(env[0].value.as_deref(), Some("run-42"));
assert!(env[0].value_from.is_none());
}
#[test]
fn build_job_propagates_env_secret_ref() {
let mut cfg = minimal_config();
cfg.env.push(ContainerEnvVar {
name: "API_KEY".to_string(),
value: None,
value_from: Some(EnvValueFrom {
secret_name: "ml-secrets".to_string(),
secret_key: "openai_api_key".to_string(),
}),
});
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
let env = job.spec.unwrap().template.spec.unwrap().containers[0]
.env
.clone()
.expect("env present");
let v = env[0].value_from.as_ref().expect("value_from set");
let secret = v.secret_key_ref.as_ref().expect("secret_key_ref set");
assert_eq!(secret.name, "ml-secrets");
assert_eq!(secret.key, "openai_api_key");
assert!(env[0].value.is_none());
}
#[test]
fn build_job_rejects_env_with_both_value_and_value_from() {
let mut cfg = minimal_config();
cfg.env.push(ContainerEnvVar {
name: "AMBIGUOUS".to_string(),
value: Some("literal".to_string()),
value_from: Some(EnvValueFrom {
secret_name: "s".to_string(),
secret_key: "k".to_string(),
}),
});
let err = ContainerTool::build_job(&cfg, &ctx()).expect_err("must reject");
let msg = match err {
ToolError::Configuration(m) => m,
other => panic!("expected Configuration, got {other:?}"),
};
assert!(msg.contains("value and value_from are mutually exclusive"), "got {msg}");
}
#[test]
fn build_job_rejects_empty_image() {
let mut cfg = minimal_config();
cfg.image = "".to_string();
let err = ContainerTool::build_job(&cfg, &ctx()).expect_err("must reject");
let msg = match err {
ToolError::Configuration(m) => m,
other => panic!("expected Configuration, got {other:?}"),
};
assert!(msg.contains("image is required"), "got {msg}");
}
#[test]
fn build_job_propagates_resources() {
let mut cfg = minimal_config();
cfg.resources.requests.insert("cpu".to_string(), "500m".to_string());
cfg.resources.requests.insert("memory".to_string(), "1Gi".to_string());
cfg.resources.limits.insert("cpu".to_string(), "2".to_string());
cfg.resources.limits.insert("memory".to_string(), "4Gi".to_string());
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
let res = job.spec.unwrap().template.spec.unwrap().containers[0]
.resources
.clone()
.expect("resources present");
let requests = res.requests.expect("requests present");
assert_eq!(requests.get("cpu").map(|q| q.0.as_str()), Some("500m"));
assert_eq!(requests.get("memory").map(|q| q.0.as_str()), Some("1Gi"));
let limits = res.limits.expect("limits present");
assert_eq!(limits.get("cpu").map(|q| q.0.as_str()), Some("2"));
assert_eq!(limits.get("memory").map(|q| q.0.as_str()), Some("4Gi"));
}
#[test]
fn build_job_empty_resources_means_none() {
let cfg = minimal_config();
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
let container = &job.spec.unwrap().template.spec.unwrap().containers[0];
assert!(container.resources.is_none(), "empty resources should serialise as None");
}
#[test]
fn build_job_sets_backoff_and_deadline() {
let mut cfg = minimal_config();
cfg.backoff_limit = Some(3);
cfg.timeout_seconds = Some(3600);
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
let spec = job.spec.unwrap();
assert_eq!(spec.backoff_limit, Some(3));
assert_eq!(spec.active_deadline_seconds, Some(3600));
}
#[test]
fn build_job_default_backoff_is_zero() {
let cfg = minimal_config();
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
assert_eq!(job.spec.unwrap().backoff_limit, Some(0));
}
#[test]
fn build_job_default_restart_policy_is_never() {
let cfg = minimal_config();
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
let policy = job.spec.unwrap().template.spec.unwrap().restart_policy;
assert_eq!(policy.as_deref(), Some("Never"));
}
#[test]
fn build_job_step_slug_is_lowercased_and_truncated() {
let mut c = ctx();
c.step = "VeryLongStepNameWithLotsOfCharactersExceedingTwentyChars".to_string();
let cfg = minimal_config();
let job = ContainerTool::build_job(&cfg, &c).expect("build");
let gn = job.metadata.generate_name.as_deref().unwrap_or("");
let between = gn.trim_start_matches("noetl-container-");
let step_part = between.split('-').next().unwrap();
assert!(step_part.len() <= 20, "step slug too long: {step_part}");
assert_eq!(step_part, step_part.to_lowercase(), "step slug not lowercase: {step_part}");
}
#[test]
fn build_job_empty_step_falls_back_to_step_word() {
let mut c = ctx();
c.step = "".to_string();
let cfg = minimal_config();
let job = ContainerTool::build_job(&cfg, &c).expect("build");
let gn = job.metadata.generate_name.as_deref().unwrap_or("");
assert!(gn.starts_with("noetl-container-step-"), "got {gn}");
}
#[test]
fn build_job_propagates_service_account() {
let mut cfg = minimal_config();
cfg.service_account = Some("noetl-container-job".to_string());
let job = ContainerTool::build_job(&cfg, &ctx()).expect("build");
let sa = job.spec.unwrap().template.spec.unwrap().service_account_name;
assert_eq!(sa.as_deref(), Some("noetl-container-job"));
}
}