stormchaser_engine/handler/step/events/
failed.rs1use crate::handler::{archive_workflow, dispatch_pending_steps, fetch_run, fetch_step_instance};
2use crate::workflow_machine::{state, WorkflowMachine};
3use anyhow::{Context, Result};
4use chrono::Utc;
5use serde_json::Value;
6use sqlx::PgPool;
7use std::sync::Arc;
8use stormchaser_model::events::WorkflowFailedEvent;
9use stormchaser_model::step::StepStatus;
10use stormchaser_model::RunId;
11use stormchaser_model::StepInstanceId;
12use stormchaser_tls::TlsReloader;
13use tracing::{error, info};
14
15use crate::handler::step::quota::release_step_quota_for_instance;
16
17use super::helpers::persist_step_test_reports;
18
19#[tracing::instrument(skip(payload, pool, nats_client, tls_reloader), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
20pub async fn handle_step_failed(
22 payload: Value,
23 pool: PgPool,
24 nats_client: async_nats::Client,
25 tls_reloader: Arc<TlsReloader>,
26) -> Result<()> {
27 let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
28 let run_id = uuid::Uuid::parse_str(run_id_str).map(RunId::new)?;
29 let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
30 let step_id = uuid::Uuid::parse_str(step_id_str).map(StepInstanceId::new)?;
31
32 let span = tracing::Span::current();
33 span.record("run_id", tracing::field::display(run_id));
34 span.record("step_id", tracing::field::display(step_id));
35 let error_msg = payload["error"].as_str().unwrap_or("Unknown error");
36 let exit_code = payload["exit_code"].as_i64().map(|c| c as i32);
37
38 info!("Step {} (Run {}) failed: {}", step_id, run_id, error_msg);
39
40 let mut tx = pool.begin().await?;
41
42 if crate::db::lock_workflow_run(&mut *tx, run_id)
43 .await?
44 .is_none()
45 {
46 return Ok(());
47 }
48
49 let instance = fetch_step_instance(step_id, &mut *tx).await?;
50
51 if instance.status == StepStatus::Succeeded || instance.status == StepStatus::Failed {
53 return Ok(());
54 }
55
56 let _ = release_step_quota_for_instance(&mut *tx, run_id, step_id).await;
57
58 let machine =
59 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
60 instance.clone(),
61 );
62 let _ = machine
63 .fail(error_msg.to_string(), exit_code, &mut *tx)
64 .await?;
65
66 let attributes = [
67 opentelemetry::KeyValue::new("step_name", instance.step_name),
68 opentelemetry::KeyValue::new("step_type", instance.step_type),
69 opentelemetry::KeyValue::new("runner_id", instance.runner_id.unwrap_or_default()),
70 opentelemetry::KeyValue::new("error", error_msg.to_string()),
71 ];
72
73 crate::STEPS_FAILED.add(1, &attributes);
74
75 if let Some(started_at) = instance.started_at {
76 let duration = (Utc::now() - started_at).to_std().unwrap_or_default();
77 crate::STEP_DURATION.record(duration.as_secs_f64(), &attributes);
78 }
79
80 if let Some(outputs) = payload["outputs"].as_object() {
81 for (key, value) in outputs {
82 crate::db::upsert_step_output(&mut *tx, step_id, key, value).await?;
83 }
84 }
85
86 persist_step_test_reports(&payload, &mut tx, run_id, step_id, &pool).await?;
88
89 let run = fetch_run(run_id, &mut *tx).await?;
90 let run_machine = WorkflowMachine::<state::Running>::new_from_run(run.clone());
91 let _ = run_machine
92 .fail(format!("Step {} failed: {}", step_id, error_msg), &mut *tx)
93 .await?;
94
95 let js = async_nats::jetstream::new(nats_client.clone());
96 if let Err(e) = stormchaser_model::nats::publish_cloudevent(
97 &js,
98 "stormchaser.v1.run.failed",
99 "workflow_failed",
100 "stormchaser-engine",
101 serde_json::to_value(WorkflowFailedEvent {
102 run_id,
103 event_type: "workflow_failed".to_string(),
104 timestamp: chrono::Utc::now(),
105 })
106 .unwrap(),
107 None,
108 None,
109 )
110 .await
111 {
112 error!(
113 "Failed to publish workflow failed event for {}: {:?}",
114 run_id, e
115 );
116 }
117
118 crate::RUNS_FAILED.add(
119 1,
120 &[
121 opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
122 opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
123 opentelemetry::KeyValue::new("error", format!("Step {} failed", step_id)),
124 ],
125 );
126
127 tx.commit().await?;
128 info!("Workflow run {} failed, archiving...", run_id);
129 archive_workflow(run_id, pool.clone()).await?;
130
131 if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
132 error!(
133 "Failed to dispatch pending steps after failure for run {}: {:?}",
134 run_id, e
135 );
136 }
137
138 Ok(())
139}