stormchaser_engine/handler/step/events/
failed.rs1use crate::handler::{archive_workflow, dispatch_pending_steps, fetch_run, fetch_step_instance};
2use crate::workflow_machine::{state, WorkflowMachine};
3use anyhow::Result;
4use chrono::Utc;
5use sqlx::PgPool;
6use std::sync::Arc;
7use stormchaser_model::events::WorkflowFailedEvent;
8use stormchaser_model::events::{EventSource, EventType, WorkflowEventType};
9use stormchaser_model::nats::publish_cloudevent;
10use stormchaser_model::step::StepStatus;
11use stormchaser_tls::TlsReloader;
12use tracing::{error, info};
13
14use crate::handler::step::quota::release_step_quota_for_instance;
15
16use super::helpers::persist_step_test_reports;
17
18#[tracing::instrument(skip(event, pool, nats_client, tls_reloader), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
19pub async fn handle_step_failed(
21 event: stormchaser_model::events::StepFailedEvent,
22 pool: PgPool,
23 nats_client: async_nats::Client,
24 tls_reloader: Arc<TlsReloader>,
25) -> Result<()> {
26 let run_id = event.run_id;
27 let step_id = event.step_id;
28
29 let span = tracing::Span::current();
30 span.record("run_id", tracing::field::display(run_id));
31 span.record("step_id", tracing::field::display(step_id));
32 let error_msg = &event.error;
33 let exit_code = event.exit_code;
34
35 info!("Step {} (Run {}) failed: {}", step_id, run_id, error_msg);
36
37 let mut tx = pool.begin().await?;
38
39 if crate::db::lock_workflow_run(&mut *tx, run_id)
40 .await?
41 .is_none()
42 {
43 return Ok(());
44 }
45
46 let instance = fetch_step_instance(step_id, &mut *tx).await?;
47
48 if instance.status == StepStatus::Succeeded || instance.status == StepStatus::Failed {
50 return Ok(());
51 }
52
53 let _ = release_step_quota_for_instance(&mut *tx, run_id, step_id).await;
54
55 let machine =
56 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
57 instance.clone(),
58 );
59 let _ = machine
60 .fail(error_msg.to_string(), exit_code, &mut *tx)
61 .await?;
62
63 let attributes = [
64 opentelemetry::KeyValue::new("step_name", instance.step_name),
65 opentelemetry::KeyValue::new("step_type", instance.step_type),
66 opentelemetry::KeyValue::new("runner_id", instance.runner_id.unwrap_or_default()),
67 opentelemetry::KeyValue::new("error", error_msg.to_string()),
68 ];
69
70 crate::STEPS_FAILED.add(1, &attributes);
71
72 if let Some(started_at) = instance.started_at {
73 let duration = (Utc::now() - started_at).to_std().unwrap_or_default();
74 crate::STEP_DURATION.record(duration.as_secs_f64(), &attributes);
75 }
76
77 if let Some(outputs) = event
78 .outputs
79 .as_ref()
80 .map(|m| {
81 m.clone()
82 .into_iter()
83 .collect::<serde_json::Map<String, serde_json::Value>>()
84 })
85 .as_ref()
86 {
87 for (key, value) in outputs {
88 crate::db::upsert_step_output(&mut *tx, step_id, key, value).await?;
89 }
90 }
91
92 persist_step_test_reports(event.test_reports.as_ref(), &mut tx, run_id, step_id, &pool).await?;
94
95 let run = fetch_run(run_id, &mut *tx).await?;
96 let run_machine = WorkflowMachine::<state::Running>::new_from_run(run.clone());
97 let _ = run_machine
98 .fail(format!("Step {} failed: {}", step_id, error_msg), &mut *tx)
99 .await?;
100
101 let js = async_nats::jetstream::new(nats_client.clone());
102 use stormchaser_model::nats::NatsSubject;
103 if let Err(e) = publish_cloudevent(
104 &js,
105 NatsSubject::RunFailed,
106 EventType::Workflow(WorkflowEventType::Failed),
107 EventSource::Engine,
108 serde_json::to_value(WorkflowFailedEvent {
109 run_id,
110 event_type: EventType::Workflow(WorkflowEventType::Failed),
111 timestamp: chrono::Utc::now(),
112 })
113 .unwrap(),
114 None,
115 None,
116 )
117 .await
118 {
119 error!(
120 "Failed to publish workflow failed event for {}: {:?}",
121 run_id, e
122 );
123 }
124
125 crate::RUNS_FAILED.add(
126 1,
127 &[
128 opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
129 opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
130 opentelemetry::KeyValue::new("error", format!("Step {} failed", step_id)),
131 ],
132 );
133
134 tx.commit().await?;
135 info!("Workflow run {} failed, archiving...", run_id);
136 archive_workflow(run_id, pool.clone()).await?;
137
138 if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
139 error!(
140 "Failed to dispatch pending steps after failure for run {}: {:?}",
141 run_id, e
142 );
143 }
144
145 Ok(())
146}