use chrono::Utc;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use stormchaser_model::dsl::Step;
use stormchaser_model::events::StepCompletedEvent;
use stormchaser_model::events::StepFailedEvent;
use stormchaser_model::events::{EventSource, EventType, SchemaVersion, StepEventType};
use stormchaser_model::nats::publish_cloudevent;
use stormchaser_model::nats::NatsSubject;
use stormchaser_model::RunId;
use stormchaser_model::StepInstanceId;
use tokio::time::sleep;
use uuid::Uuid;
use stormchaser_model::dsl;
use crate::cluster::ClusterPool;
use crate::job_machine;
pub fn fallback_step(payload: &Value, spec: serde_json::Value) -> Step {
Step {
name: payload["step_name"]
.as_str()
.unwrap_or_default()
.to_string(),
r#type: payload["step_type"]
.as_str()
.unwrap_or_default()
.to_string(),
spec,
params: serde_json::from_value(payload["params"].clone()).unwrap_or_default(),
condition: None,
strategy: None,
aggregation: Vec::new(),
iterate: None,
iterate_as: None,
steps: None,
next: Vec::new(),
on_failure: None,
aliases: std::collections::HashMap::new(),
retry: None,
timeout: None,
allow_failure: None,
start_marker: None,
end_marker: None,
outputs: Vec::new(),
reports: Vec::new(),
artifacts: None,
}
}
#[allow(clippy::too_many_arguments)]
async fn execute_job_on_cluster(
client: kube::Client,
cluster_version: String,
run_id: Uuid,
step_id: Uuid,
fencing_token: i64,
step_dsl: dsl::Step,
storage: Option<HashMap<String, Value>>,
test_report_urls: Option<HashMap<String, Value>>,
registry_auth: Option<Value>,
runner_id: String,
nats_client: async_nats::Client,
encryption_key: Option<String>,
received_at: chrono::DateTime<chrono::Utc>,
in_progress_handle: tokio::task::JoinHandle<()>,
msg: async_nats::jetstream::message::Message,
) {
let namespace = std::env::var("KUBERNETES_NAMESPACE").unwrap_or_else(|_| "default".to_string());
let metadata = job_machine::JobMetadata {
run_id,
step_id,
fencing_token,
step_dsl,
namespace,
received_at,
cluster_version,
encryption_key,
storage,
test_report_urls,
registry_auth,
};
let machine = job_machine::K8sJobMachine::new(client.clone(), metadata.clone());
let result = match machine.start().await {
Ok(job_machine::StartResult::Running(running_machine)) => running_machine.wait().await,
Ok(job_machine::StartResult::Failed(finished_machine)) => {
Ok(finished_machine.into_result())
}
Err(e) => Err(e),
};
in_progress_handle.abort();
let _ = msg.double_ack().await;
publish_job_result(
nats_client,
run_id,
step_id,
metadata.fencing_token,
runner_id,
result,
)
.await;
}
async fn publish_job_result(
nats_client: async_nats::Client,
run_id: Uuid,
step_id: Uuid,
fencing_token: i64,
runner_id: String,
result: Result<job_machine::JobState, anyhow::Error>,
) {
match result {
Ok(state) => {
match &state {
job_machine::JobState::Succeeded(_) => {
tracing::info!("Step {} (Run {}) completed successfully", step_id, run_id);
}
job_machine::JobState::Failed(reason, _) => {
tracing::error!("Step {} (Run {}) failed: {}", step_id, run_id, reason);
}
}
let (subject, event_type, event) =
build_job_result_event(state, run_id, step_id, fencing_token, runner_id.clone());
let _ = publish_cloudevent(
&async_nats::jetstream::new(nats_client.clone()),
subject,
event_type,
EventSource::System,
event,
Some(SchemaVersion::new("1.0".to_string())),
None,
)
.await;
}
Err(e) => {
tracing::error!("Error running K8s job for step {}: {:?}", step_id, e);
let _ = publish_cloudevent(
&async_nats::jetstream::new(nats_client.clone()),
NatsSubject::StepFailed(Some(stormchaser_model::nats::compute_shard_id(
&stormchaser_model::RunId::new(run_id),
))),
EventType::Step(StepEventType::Failed),
EventSource::System,
build_job_error_event(run_id, step_id, fencing_token, runner_id.clone(), &e),
Some(SchemaVersion::new("1.0".to_string())),
None,
)
.await;
}
}
}
fn build_job_outputs(metrics: &job_machine::JobMetrics) -> HashMap<String, Value> {
let mut outputs = HashMap::new();
outputs.insert(
"k8s exit code".to_string(),
serde_json::json!(metrics.exit_code),
);
outputs.insert(
"Number of attempts".to_string(),
serde_json::json!(metrics.attempts),
);
outputs.insert(
"run duration".to_string(),
serde_json::json!(format!("{}ms", metrics.duration_ms)),
);
outputs.insert(
"run latency".to_string(),
serde_json::json!(format!("{}ms", metrics.latency_ms)),
);
outputs
}
fn build_job_result_event(
state: job_machine::JobState,
run_id: Uuid,
step_id: Uuid,
fencing_token: i64,
runner_id: String,
) -> (NatsSubject, EventType, Value) {
match state {
job_machine::JobState::Succeeded(metrics) => {
let event_type = EventType::Step(StepEventType::Completed);
let outputs = build_job_outputs(&metrics);
let event = StepCompletedEvent {
run_id: RunId::new(run_id),
step_id: StepInstanceId::new(step_id),
fencing_token,
event_type: event_type.clone(),
runner_id: Some(runner_id),
exit_code: metrics.exit_code,
storage_hashes: metrics.storage_hashes.map(|h| {
h.into_iter()
.map(|(k, v)| (k, serde_json::json!(v)))
.collect()
}),
artifacts: metrics.artifacts,
test_reports: metrics.test_reports,
outputs: Some(outputs),
timestamp: Utc::now(),
};
(
NatsSubject::StepCompleted(Some(stormchaser_model::nats::compute_shard_id(
&stormchaser_model::RunId::new(run_id),
))),
event_type,
serde_json::to_value(event).unwrap(),
)
}
job_machine::JobState::Failed(reason, metrics) => {
let event_type = EventType::Step(StepEventType::Failed);
let outputs = build_job_outputs(&metrics);
let event = StepFailedEvent {
run_id: RunId::new(run_id),
step_id: StepInstanceId::new(step_id),
fencing_token,
event_type: event_type.clone(),
error: reason,
runner_id: Some(runner_id),
exit_code: metrics.exit_code,
storage_hashes: metrics.storage_hashes.map(|h| {
h.into_iter()
.map(|(k, v)| (k, serde_json::json!(v)))
.collect()
}),
artifacts: metrics.artifacts,
test_reports: metrics.test_reports,
outputs: Some(outputs),
timestamp: Utc::now(),
};
(
NatsSubject::StepFailed(Some(stormchaser_model::nats::compute_shard_id(
&stormchaser_model::RunId::new(run_id),
))),
event_type,
serde_json::to_value(event).unwrap(),
)
}
}
}
fn build_job_error_event(
run_id: Uuid,
step_id: Uuid,
fencing_token: i64,
runner_id: String,
error: &anyhow::Error,
) -> Value {
serde_json::to_value(StepFailedEvent {
run_id: RunId::new(run_id),
step_id: StepInstanceId::new(step_id),
fencing_token,
event_type: EventType::Step(StepEventType::Failed),
error: format!("{:?}", error),
runner_id: Some(runner_id),
exit_code: None,
storage_hashes: None,
artifacts: None,
test_reports: None,
outputs: None,
timestamp: Utc::now(),
})
.unwrap()
}
pub async fn handle_task(
msg: async_nats::jetstream::message::Message,
cluster_pool: Arc<ClusterPool>,
nats_client: async_nats::Client,
runner_id: String,
encryption_key: Option<String>,
) {
let received_at = Utc::now();
tracing::info!("Received task message: {:?}", msg.subject);
let ce: cloudevents::Event = match serde_json::from_slice(&msg.payload) {
Ok(e) => e,
Err(e) => {
tracing::error!("Failed to parse CloudEvent payload: {:?}", e);
if let Err(ack_err) = msg.double_ack().await {
tracing::warn!("Failed to ack message after parsing error: {:?}", ack_err);
}
return;
}
};
let payload: Value = if let Some(cloudevents::Data::Json(v)) = ce.data() {
v.clone()
} else {
tracing::error!("CloudEvent data is not JSON");
if let Err(ack_err) = msg.ack().await {
tracing::warn!("Failed to ack unparseable task message: {:?}", ack_err);
}
return;
};
let run_id_str = payload["run_id"].as_str().unwrap_or_default();
let run_id = match Uuid::parse_str(run_id_str) {
Ok(id) => id,
Err(e) => {
tracing::error!("Invalid run_id '{}' in task message: {:?}", run_id_str, e);
if let Err(ack_err) = msg.ack().await {
tracing::warn!("Failed to ack invalid-run_id task message: {:?}", ack_err);
}
return;
}
};
let step_id_str = payload["step_id"].as_str().unwrap_or_default();
let step_id = match Uuid::parse_str(step_id_str) {
Ok(id) => id,
Err(e) => {
tracing::error!("Invalid step_id '{}' in task message: {:?}", step_id_str, e);
if let Err(ack_err) = msg.ack().await {
tracing::warn!("Failed to ack invalid-step_id task message: {:?}", ack_err);
}
return;
}
};
let spec = serde_json::from_value(payload["spec"].clone()).unwrap_or(serde_json::Value::Null);
let step_dsl: dsl::Step = if let Some(dsl_val) = payload.get("step_dsl") {
if !dsl_val.is_null() {
if let Ok(mut step) = serde_json::from_value::<dsl::Step>(dsl_val.clone()) {
step.spec = spec;
step
} else {
fallback_step(&payload, spec)
}
} else {
fallback_step(&payload, spec)
}
} else {
fallback_step(&payload, spec)
};
let storage: Option<HashMap<String, Value>> =
serde_json::from_value(payload["storage"].clone()).ok();
let test_report_urls: Option<HashMap<String, Value>> =
serde_json::from_value(payload["test_report_urls"].clone()).ok();
let registry_auth: Option<Value> =
serde_json::from_value(payload["registry_auth"].clone()).ok();
let fencing_token: i64 = payload["fencing_token"].as_i64().unwrap_or(0);
let in_progress_msg = msg.clone();
let in_progress_handle = tokio::spawn(async move {
loop {
sleep(Duration::from_secs(15)).await;
let _ = in_progress_msg
.ack_with(async_nats::jetstream::message::AckKind::Progress)
.await;
}
});
let running_event = stormchaser_model::events::StepRunningEvent {
run_id: RunId::new(run_id),
step_id: StepInstanceId::new(step_id),
event_type: EventType::Step(StepEventType::Running),
runner_id: Some(runner_id.clone()),
timestamp: Utc::now(),
};
let _ = publish_cloudevent(
&async_nats::jetstream::new(nats_client.clone()),
NatsSubject::StepRunning(Some(stormchaser_model::nats::compute_shard_id(
&stormchaser_model::RunId::new(run_id),
))),
EventType::Step(StepEventType::Running),
EventSource::System,
serde_json::to_value(running_event).unwrap(),
Some(SchemaVersion::new("1.0".to_string())),
None,
)
.await;
let target_cluster = "local"; match cluster_pool.get_client(target_cluster).await {
Ok((client, cluster_version)) => {
execute_job_on_cluster(
client,
cluster_version,
run_id,
step_id,
fencing_token,
step_dsl,
storage,
test_report_urls,
registry_auth,
runner_id,
nats_client,
encryption_key,
received_at,
in_progress_handle,
msg,
)
.await;
}
Err(e) => {
in_progress_handle.abort();
tracing::error!("Failed to acquire K8s client: {:?}", e);
let fail_event = StepFailedEvent {
run_id: RunId::new(run_id),
step_id: StepInstanceId::new(step_id),
fencing_token,
event_type: EventType::Step(StepEventType::Failed),
error: format!("Failed to acquire K8s client: {:?}", e),
runner_id: Some(runner_id.clone()),
exit_code: None,
storage_hashes: None,
artifacts: None,
test_reports: None,
outputs: None,
timestamp: Utc::now(),
};
let _ = publish_cloudevent(
&async_nats::jetstream::new(nats_client.clone()),
NatsSubject::StepFailed(Some(stormchaser_model::nats::compute_shard_id(
&stormchaser_model::RunId::new(run_id),
))),
EventType::Step(StepEventType::Failed),
EventSource::System,
serde_json::to_value(fail_event).unwrap(),
Some(SchemaVersion::new("1.0".to_string())),
None,
)
.await;
if let Err(ack_err) = msg.double_ack().await {
tracing::warn!(
"Failed to ack task message after client acquisition failure: {:?}",
ack_err
);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::job_machine::JobMetrics;
use serde_json::json;
use stormchaser_model::nats::NatsSubject;
use uuid::Uuid;
#[test]
fn test_fallback_step() {
let payload = json!({
"step_name": "test-step",
"step_type": "K8sJob",
"params": {
"image": "nginx"
}
});
let spec = json!({"parallelism": 2});
let step = fallback_step(&payload, spec.clone());
assert_eq!(step.name, "test-step");
assert_eq!(step.r#type, "K8sJob");
assert_eq!(step.spec, spec);
assert_eq!(step.params.get("image").unwrap(), "nginx");
}
#[test]
fn test_fallback_step_defaults_when_fields_missing_or_invalid() {
let payload = json!({
"step_name": null,
"step_type": null,
"params": "invalid"
});
let step = fallback_step(&payload, Value::Null);
assert_eq!(step.name, "");
assert_eq!(step.r#type, "");
assert!(step.params.is_empty());
}
#[test]
fn test_build_job_result_event_success() {
let run_id = Uuid::new_v4();
let step_id = Uuid::new_v4();
let metrics = JobMetrics {
exit_code: Some(0),
attempts: 2,
duration_ms: 500,
latency_ms: 12,
storage_hashes: Some(HashMap::from([("out".to_string(), "abc".to_string())])),
artifacts: Some(HashMap::from([("file".to_string(), json!("artifact.txt"))])),
test_reports: Some(HashMap::from([(
"junit".to_string(),
json!({"url": "https://paninfracon.net/junit.xml"}),
)])),
};
let (subject, event_type, event) = build_job_result_event(
job_machine::JobState::Succeeded(metrics),
run_id,
step_id,
0,
"runner-k8s".to_string(),
);
assert_eq!(
subject,
NatsSubject::StepCompleted(Some(stormchaser_model::nats::compute_shard_id(
&stormchaser_model::RunId::new(run_id)
)))
);
assert_eq!(event_type, EventType::Step(StepEventType::Completed));
assert_eq!(event["run_id"], run_id.to_string());
assert_eq!(event["step_id"], step_id.to_string());
assert_eq!(event["outputs"]["Number of attempts"], 2);
assert_eq!(
event["test_reports"]["junit"]["url"],
"https://paninfracon.net/junit.xml"
);
}
#[test]
fn test_build_job_result_event_failed() {
let run_id = Uuid::new_v4();
let step_id = Uuid::new_v4();
let metrics = JobMetrics {
exit_code: Some(1),
attempts: 1,
duration_ms: 10,
latency_ms: 3,
storage_hashes: None,
artifacts: None,
test_reports: None,
};
let (subject, event_type, event) = build_job_result_event(
job_machine::JobState::Failed("boom".to_string(), metrics),
run_id,
step_id,
0,
"runner-k8s".to_string(),
);
assert_eq!(
subject,
NatsSubject::StepFailed(Some(stormchaser_model::nats::compute_shard_id(
&stormchaser_model::RunId::new(run_id)
)))
);
assert_eq!(event_type, EventType::Step(StepEventType::Failed));
assert_eq!(event["error"], "boom");
assert_eq!(event["outputs"]["run latency"], "3ms");
}
#[test]
fn test_build_job_error_event() {
let run_id = Uuid::new_v4();
let step_id = Uuid::new_v4();
let event = build_job_error_event(
run_id,
step_id,
0,
"runner-k8s".to_string(),
&anyhow::anyhow!("job exploded"),
);
assert_eq!(event["run_id"], run_id.to_string());
assert_eq!(event["step_id"], step_id.to_string());
assert!(event["exit_code"].is_null());
assert!(event["outputs"].is_null());
assert!(event["error"]
.as_str()
.unwrap_or_default()
.contains("job exploded"));
}
}