use crate::container_machine::{ContainerMetadata, ContainerState, DockerContainerMachine};
use crate::parsing::{parse_step_from_docker_labels, parse_step_from_nats_payload};
use anyhow::Result;
use async_nats::jetstream::message::{AckKind, Message};
use bollard::container::ListContainersOptions;
use bollard::Docker;
use cloudevents::EventBuilder;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
use stormchaser_model::events::StepCompletedEvent;
use stormchaser_model::events::StepFailedEvent;
use tokio::time::sleep;
use tracing::{error, info, warn};
use uuid::Uuid;
async fn term_message(msg: &Message, reason: &str) {
error!("{}", reason);
if let Err(e) = msg.ack_with(AckKind::Term).await {
error!("Failed to send Term ack: {:?}", e);
}
}
fn build_cloudevent_payload(
event_type: &str,
source: &str,
data: Value,
) -> anyhow::Result<bytes::Bytes> {
let event = cloudevents::EventBuilderV10::new()
.id(Uuid::new_v4().to_string())
.ty(event_type)
.source(source)
.time(chrono::Utc::now())
.data("application/json", data)
.build()
.map_err(|e| anyhow::anyhow!("Failed to build CloudEvent: {}", e))?;
let payload = serde_json::to_string(&event)?;
Ok(payload.into())
}
pub async fn scan_for_orphans(
docker: Docker,
nats_client: async_nats::Client,
runner_id: String,
encryption_key: Option<String>,
) -> Result<()> {
info!("Scanning for orphaned Docker containers...");
let mut filters = HashMap::new();
filters.insert("label", vec!["managed-by=stormchaser"]);
let containers = docker
.list_containers(Some(ListContainersOptions {
all: true,
filters,
..Default::default()
}))
.await?;
for container in containers {
let container_id = container.id.unwrap_or_default();
let labels = container.labels.unwrap_or_default();
let run_id_str = labels.get("stormchaser-run-id");
let step_id_str = labels.get("stormchaser-step-id");
if let (Some(run_id_s), Some(step_id_s)) = (run_id_str, step_id_str) {
let run_id = Uuid::parse_str(run_id_s).unwrap_or_default();
let step_id = Uuid::parse_str(step_id_s).unwrap_or_default();
let container_name = container
.names
.and_then(|names| names.first().cloned())
.unwrap_or_else(|| container_id.clone())
.replace('/', "");
info!(
"Found orphaned container {} for step {} (run {})",
container_name, step_id, run_id
);
let received_at = labels
.get("stormchaser.v1.io/received-at")
.and_then(|ts| chrono::DateTime::parse_from_rfc3339(ts).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(chrono::Utc::now);
let is_encrypted = labels
.get("stormchaser.v1.io/state-encrypted")
.map(|v| v == "true")
.unwrap_or(false);
let raw_step_dsl = labels.get("stormchaser.v1.io/step-dsl");
let step_dsl = match parse_step_from_docker_labels(
&container_name,
raw_step_dsl,
is_encrypted,
encryption_key.as_ref(),
) {
Ok(dsl) => dsl,
Err(e) => {
error!(
"Failed to parse step spec for container {}: {:?}",
container_name, e
);
continue;
}
};
let nats = nats_client.clone();
let r_id = runner_id.clone();
let docker_clone = docker.clone();
let key_clone = encryption_key.clone();
tokio::spawn(async move {
let query_data = json!({ "step_id": step_id });
let query_ce_payload = match build_cloudevent_payload(
"stormchaser.v1.step.query",
"/stormchaser",
query_data,
) {
Ok(p) => p,
Err(e) => {
error!("Failed to build step query CloudEvent: {:?}", e);
return;
}
};
match nats
.request("stormchaser.v1.step.query", query_ce_payload)
.await
{
Ok(reply) => {
let ce: cloudevents::Event = match serde_json::from_slice(&reply.payload) {
Ok(e) => e,
Err(e) => {
error!(
"Failed to parse step query response as CloudEvent: {:?}",
e
);
return;
}
};
let response: Value = match ce.data() {
Some(cloudevents::Data::Json(v)) => v.clone(),
_ => {
error!("Step query response CloudEvent has no JSON data");
return;
}
};
let status = response["status"].as_str().unwrap_or_default();
let exists = response["exists"].as_bool().unwrap_or(false);
if !exists || (status != "pending" && status != "running") {
info!("Step {} is in status {}, skipping adoption and cleaning up container {}", step_id, status, container_name);
let machine = DockerContainerMachine::new(
docker_clone,
ContainerMetadata {
run_id,
step_id,
step_dsl,
storage: None,
test_report_urls: None,
encryption_key: key_clone,
received_at,
},
Some(nats.clone()),
);
let _ = machine.clean_up(&container_name).await;
return;
}
}
Err(e) => {
error!("Failed to query orchestrator for step {}: {:?}", step_id, e);
return;
}
}
let metadata = ContainerMetadata {
run_id,
step_id,
step_dsl: step_dsl.clone(),
storage: None,
test_report_urls: None,
encryption_key: key_clone,
received_at,
};
let machine =
DockerContainerMachine::new(docker_clone, metadata, Some(nats.clone()));
match machine.adopt(container_name.clone()).wait().await {
Ok(finished_machine) => match finished_machine.into_result() {
ContainerState::Succeeded(metrics) => {
info!("Adopted step {} completed successfully", step_id);
let event = json!({
"run_id": run_id,
"step_id": step_id,
"status": "succeeded",
"runner_id": r_id,
"exit_code": metrics.exit_code,
"outputs": {
"docker exit code": metrics.exit_code,
"run duration": format!("{}ms", metrics.duration_ms),
"run latency": format!("{}ms", metrics.latency_ms),
}
});
let _ = stormchaser_model::nats::publish_cloudevent(
&async_nats::jetstream::new(nats.clone()),
"stormchaser.v1.step.completed",
"stormchaser.v1.step.completed",
"/stormchaser",
serde_json::to_value(event).unwrap(),
Some("1.0"),
None,
)
.await;
}
ContainerState::Failed(reason, metrics) => {
warn!("Adopted step {} failed: {}", step_id, reason);
let event = json!({
"run_id": run_id,
"step_id": step_id,
"status": "failed",
"error": reason,
"runner_id": r_id,
"exit_code": metrics.exit_code,
"outputs": {
"docker exit code": metrics.exit_code,
"run duration": format!("{}ms", metrics.duration_ms),
"run latency": format!("{}ms", metrics.latency_ms),
}
});
let _ = stormchaser_model::nats::publish_cloudevent(
&async_nats::jetstream::new(nats.clone()),
"stormchaser.v1.step.failed",
"stormchaser.v1.step.failed",
"/stormchaser",
serde_json::to_value(event).unwrap(),
Some("1.0"),
None,
)
.await;
}
},
Err(e) => error!("Error adopting container {}: {:?}", container_name, e),
}
});
}
}
Ok(())
}
pub async fn handle_task(
msg: async_nats::jetstream::message::Message,
docker: bollard::Docker,
nats_client: async_nats::Client,
runner_id: String,
encryption_key: Option<String>,
) {
let received_at = chrono::Utc::now();
info!("Received task message: {:?}", msg.subject);
let ce: cloudevents::Event = match serde_json::from_slice(&msg.payload) {
Ok(event) => event,
Err(e) => {
term_message(
&msg,
&format!(
"Failed to deserialize CloudEvent from task message: {:?}",
e
),
)
.await;
return;
}
};
let payload: Value = match ce.data() {
Some(cloudevents::Data::Json(v)) => v.clone(),
_ => {
term_message(&msg, "Task message CloudEvent does not contain JSON data").await;
return;
}
};
let run_id = match payload["run_id"]
.as_str()
.and_then(|s| Uuid::parse_str(s).ok())
{
Some(id) => id,
None => {
term_message(&msg, "Task message missing or invalid run_id").await;
return;
}
};
let step_id = match payload["step_id"]
.as_str()
.and_then(|s| Uuid::parse_str(s).ok())
{
Some(id) => id,
None => {
term_message(&msg, "Task message missing or invalid step_id").await;
return;
}
};
let step_dsl = match parse_step_from_nats_payload(&payload) {
Ok(step) => step,
Err(e) => {
term_message(&msg, &format!("Failed to parse step spec: {:?}", e)).await;
return;
}
};
let storage: Option<HashMap<String, Value>> =
serde_json::from_value(payload["storage"].clone()).ok();
let test_report_urls: Option<HashMap<String, Value>> =
serde_json::from_value(payload["test_report_urls"].clone()).ok();
let in_progress_msg = msg.clone();
let in_progress_handle = tokio::spawn(async move {
loop {
sleep(Duration::from_secs(15)).await;
let _ = in_progress_msg.ack_with(AckKind::Progress).await;
}
});
let running_event = json!({
"run_id": run_id,
"step_id": step_id,
"status": "running",
"runner_id": runner_id,
"timestamp": chrono::Utc::now(),
});
let _ = stormchaser_model::nats::publish_cloudevent(
&async_nats::jetstream::new(nats_client.clone()),
"stormchaser.v1.step.running",
"stormchaser.v1.step.running",
"/stormchaser",
serde_json::to_value(running_event).unwrap(),
Some("1.0"),
None,
)
.await;
let machine = DockerContainerMachine::new(
docker,
ContainerMetadata {
run_id,
step_id,
step_dsl,
storage,
test_report_urls,
encryption_key,
received_at,
},
Some(nats_client.clone()),
);
let result = match machine.start().await {
Ok(crate::container_machine::StartResult::Running(running_machine)) => {
running_machine.wait().await.map(|m| m.into_result())
}
Ok(crate::container_machine::StartResult::Failed(finished_machine)) => {
Ok(finished_machine.into_result())
}
Err(e) => Err(e),
};
in_progress_handle.abort();
let _ = msg.double_ack().await;
match result {
Ok(ContainerState::Succeeded(metrics)) => {
info!("Step {} (Run {}) completed successfully", step_id, run_id);
let mut outputs = std::collections::HashMap::new();
outputs.insert(
"docker exit code".to_string(),
serde_json::json!(metrics.exit_code),
);
outputs.insert(
"run duration".to_string(),
serde_json::json!(format!("{}ms", metrics.duration_ms)),
);
outputs.insert(
"run latency".to_string(),
serde_json::json!(format!("{}ms", metrics.latency_ms)),
);
let event = StepCompletedEvent {
run_id: stormchaser_model::RunId::new(run_id),
step_id: stormchaser_model::StepInstanceId::new(step_id),
event_type: "stormchaser.v1.step.completed".to_string(),
runner_id: Some(runner_id.clone()),
exit_code: metrics.exit_code.map(|c| c as i32),
storage_hashes: metrics.storage_hashes.map(|h| {
h.into_iter()
.map(|(k, v)| (k, serde_json::json!(v)))
.collect()
}),
artifacts: metrics.artifacts,
test_reports: metrics
.test_reports
.and_then(|v| v.as_object().map(|obj| obj.clone().into_iter().collect())),
outputs: Some(outputs),
timestamp: chrono::Utc::now(),
};
let _ = stormchaser_model::nats::publish_cloudevent(
&async_nats::jetstream::new(nats_client.clone()),
"stormchaser.v1.step.completed",
"stormchaser.v1.step.completed",
"/stormchaser",
serde_json::to_value(event).unwrap(),
Some("1.0"),
None,
)
.await;
}
Ok(ContainerState::Failed(reason, metrics)) => {
error!("Step {} (Run {}) failed: {}", step_id, run_id, reason);
let mut outputs = std::collections::HashMap::new();
outputs.insert(
"docker exit code".to_string(),
serde_json::json!(metrics.exit_code),
);
outputs.insert(
"run duration".to_string(),
serde_json::json!(format!("{}ms", metrics.duration_ms)),
);
outputs.insert(
"run latency".to_string(),
serde_json::json!(format!("{}ms", metrics.latency_ms)),
);
let event = StepFailedEvent {
run_id: stormchaser_model::RunId::new(run_id),
step_id: stormchaser_model::StepInstanceId::new(step_id),
event_type: "stormchaser.v1.step.failed".to_string(),
error: reason,
runner_id: Some(runner_id.clone()),
exit_code: metrics.exit_code.map(|c| c as i32),
storage_hashes: metrics.storage_hashes.map(|h| {
h.into_iter()
.map(|(k, v)| (k, serde_json::json!(v)))
.collect()
}),
artifacts: metrics.artifacts,
test_reports: metrics
.test_reports
.and_then(|v| v.as_object().map(|obj| obj.clone().into_iter().collect())),
outputs: Some(outputs),
timestamp: chrono::Utc::now(),
};
let _ = stormchaser_model::nats::publish_cloudevent(
&async_nats::jetstream::new(nats_client.clone()),
"stormchaser.v1.step.failed",
"stormchaser.v1.step.failed",
"/stormchaser",
serde_json::to_value(event).unwrap(),
Some("1.0"),
None,
)
.await;
}
Err(e) => {
error!("Error running container for step {}: {:?}", step_id, e);
let event = json!({
"run_id": run_id,
"step_id": step_id,
"status": "failed",
"error": format!("{:?}", e),
"runner_id": runner_id,
});
let _ = stormchaser_model::nats::publish_cloudevent(
&async_nats::jetstream::new(nats_client.clone()),
"stormchaser.v1.step.failed",
"stormchaser.v1.step.failed",
"/stormchaser",
serde_json::to_value(event).unwrap(),
Some("1.0"),
None,
)
.await;
}
}
}