stormchaser_engine/handler/step/events/
completed.rs1#![allow(clippy::explicit_auto_deref)]
2use crate::handler::{
3 archive_workflow, dispatch_pending_steps, fetch_outputs, fetch_run, fetch_run_context,
4 fetch_step_instance,
5};
6use crate::workflow_machine::{state, WorkflowMachine};
7use anyhow::{Context, Result};
8use chrono::Utc;
9use serde_json::Value;
10use sqlx::PgPool;
11use std::sync::Arc;
12use stormchaser_dsl::ast::{self, Workflow};
13use stormchaser_model::dsl::OutputExtraction;
14use stormchaser_model::events::WorkflowCompletedEvent;
15use stormchaser_model::step::{StepInstance, StepStatus};
16use stormchaser_model::LogBackend;
17use stormchaser_model::RunId;
18use stormchaser_model::StepInstanceId;
19use stormchaser_tls::TlsReloader;
20use tracing::{debug, error, info};
21use uuid::Uuid;
22
23use crate::handler::step::dispatch::{dispatch_step_instance, find_step};
24use crate::handler::step::quota::release_step_quota_for_instance;
25use crate::handler::step::scheduling::schedule_step;
26
27use super::helpers::persist_step_test_reports;
28
29#[tracing::instrument(skip(payload, pool, nats_client, log_backend, tls_reloader), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
30pub async fn handle_step_completed(
32 payload: Value,
33 pool: PgPool,
34 nats_client: async_nats::Client,
35 log_backend: Arc<Option<LogBackend>>,
36 tls_reloader: Arc<TlsReloader>,
37) -> Result<()> {
38 let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
39 let run_id = uuid::Uuid::parse_str(run_id_str).map(RunId::new)?;
40 let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
41 let step_id = uuid::Uuid::parse_str(step_id_str).map(StepInstanceId::new)?;
42
43 let span = tracing::Span::current();
44 span.record("run_id", tracing::field::display(run_id));
45 span.record("step_id", tracing::field::display(step_id));
46
47 info!("Step {} (Run {}) completed successfully", step_id, run_id);
48
49 let mut tx = pool.begin().await?;
50
51 if crate::db::lock_workflow_run(&mut *tx, run_id)
53 .await?
54 .is_none()
55 {
56 debug!(
57 "Workflow run {} already archived or missing, skipping handler",
58 run_id
59 );
60 return Ok(());
61 }
62
63 let instance = fetch_step_instance(step_id, &mut *tx).await?;
65
66 if instance.status == StepStatus::Succeeded || instance.status == StepStatus::Failed {
68 return Ok(());
69 }
70
71 let _ = release_step_quota_for_instance(&mut *tx, run_id, step_id).await;
72
73 let machine =
74 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
75 instance.clone(),
76 );
77 let _ = machine.succeed(&mut *tx).await?;
78
79 let attributes = [
80 opentelemetry::KeyValue::new("step_name", instance.step_name),
81 opentelemetry::KeyValue::new("step_type", instance.step_type),
82 opentelemetry::KeyValue::new("runner_id", instance.runner_id.unwrap_or_default()),
83 ];
84
85 crate::STEPS_COMPLETED.add(1, &attributes);
86
87 if let Some(started_at) = instance.started_at {
88 let duration = (Utc::now() - started_at).to_std().unwrap_or_default();
89 crate::STEP_DURATION.record(duration.as_secs_f64(), &attributes);
90 }
91
92 let all_steps: Vec<StepInstance> =
94 crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
95
96 if let Some(outputs) = payload["outputs"].as_object() {
98 for (key, value) in outputs {
99 crate::db::upsert_step_output(&mut *tx, step_id, key, value).await?;
100 }
101 }
102
103 if let Some(storage_hashes) = payload["storage_hashes"].as_object() {
105 for (name, hash_val) in storage_hashes {
106 if let Some(hash) = hash_val.as_str() {
107 crate::db::upsert_run_storage_state(&mut *tx, run_id.into_inner(), name, hash)
108 .await?;
109 }
110 }
111 }
112
113 if let Some(artifacts) = payload["artifacts"].as_object() {
115 let context = fetch_run_context(run_id, &mut *tx).await?;
116 let workflow: Workflow = serde_json::from_value(context.workflow_definition)
117 .context("Failed to parse workflow definition from DB")?;
118
119 for (name, meta) in artifacts {
120 let storage = workflow
122 .storage
123 .iter()
124 .find(|s| s.artifacts.iter().any(|a| a.name == *name));
125 if let Some(s) = storage {
126 let backend_id: Option<Uuid> = if let Some(ref backend_name) = s.backend {
127 crate::db::get_storage_backend_id_by_name(&mut *tx, backend_name).await?
128 } else {
129 crate::db::get_default_sfs_backend_id(&mut *tx).await?
130 };
131
132 if let Some(bid) = backend_id {
133 let remote_path = format!("artifacts/{}/{}/{}", run_id, s.name, name);
134 crate::db::insert_artifact_registry(
135 &mut *tx,
136 run_id,
137 step_id,
138 name,
139 stormchaser_model::BackendId::new(bid),
140 remote_path,
141 meta.clone(),
142 )
143 .await?;
144 }
145 }
146 }
147 }
148
149 persist_step_test_reports(&payload, &mut tx, run_id, step_id, &pool).await?;
151
152 let context = fetch_run_context(run_id, &mut *tx).await?;
154 let workflow: Workflow = serde_json::from_value(context.workflow_definition)
155 .context("Failed to parse workflow definition from DB")?;
156
157 let all_steps_initial: Vec<StepInstance> =
158 crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
159
160 let current_step_instance = all_steps_initial
161 .iter()
162 .find(|s| s.id.into_inner() == step_id.into_inner())
163 .context("Completed step not found in DB")?;
164
165 let mut dsl_step = find_step(&workflow.steps, ¤t_step_instance.step_name).cloned();
166
167 if let Some(step) = &mut dsl_step {
168 if step.r#type == "TerraformApply" {
169 step.outputs.push(OutputExtraction {
170 name: "terraform".to_string(),
171 source: "stdout".to_string(),
172 marker: Some("--- TF OUTPUTS ---".to_string()),
173 format: Some("json".to_string()),
174 regex: Some(r"--- TF OUTPUTS ---\s*(.*)".to_string()),
175 group: Some(1),
176 sensitive: Some(false),
177 });
178 } else if step.r#type == "TerraformPlan" {
179 step.outputs.push(OutputExtraction {
180 name: "plan_summary".to_string(),
181 source: "stdout".to_string(),
182 marker: Some("--- TF PLAN SUMMARY ---".to_string()),
183 format: Some("string".to_string()),
184 regex: Some(r"--- TF PLAN SUMMARY ---\s*(.*)".to_string()),
185 group: Some(1),
186 sensitive: Some(false),
187 });
188 step.outputs.push(OutputExtraction {
189 name: "plan_json".to_string(),
190 source: "stdout".to_string(),
191 marker: Some("--- TF PLAN JSON ---".to_string()),
192 format: Some("json".to_string()),
193 regex: Some(r"--- TF PLAN JSON ---\s*(.*)".to_string()),
194 group: Some(1),
195 sensitive: Some(false),
196 });
197 }
198 }
199
200 if let (Some(dsl_step), Some(backend)) = (&dsl_step, &*log_backend) {
201 if !dsl_step.outputs.is_empty() {
202 tracing::info!("Scraping outputs from logs for step {}", dsl_step.name);
203 let logs = backend
204 .fetch_step_logs(
205 &dsl_step.name,
206 step_id,
207 current_step_instance.started_at,
208 current_step_instance.finished_at,
209 Some(5000), )
211 .await
212 .unwrap_or_default();
213
214 let mut filtered_logs = Vec::new();
215 let mut in_output_block = dsl_step.start_marker.is_none();
216
217 for line in &logs {
218 if let Some(start) = &dsl_step.start_marker {
219 if line.contains(start) {
220 in_output_block = true;
221 continue;
222 }
223 }
224 if let Some(end) = &dsl_step.end_marker {
225 if line.contains(end) {
226 in_output_block = false;
227 continue;
228 }
229 }
230 if in_output_block {
231 filtered_logs.push(line);
232 }
233 }
234
235 for extraction in &dsl_step.outputs {
236 if extraction.source == "logs" || extraction.source == "stdout" {
237 if let Some(regex_str) = &extraction.regex {
238 if let Ok(re) = regex::Regex::new(regex_str) {
239 for line in filtered_logs.iter().rev() {
240 if let Some(caps) = re.captures(line) {
241 let value = if let Some(marker) = &extraction.marker {
242 if line.contains(marker) {
243 caps.get(extraction.group.unwrap_or(1) as usize)
244 .map(|m| m.as_str().to_string())
245 } else {
246 None
247 }
248 } else {
249 caps.get(extraction.group.unwrap_or(1) as usize)
250 .map(|m| m.as_str().to_string())
251 };
252
253 if let Some(val) = value {
254 let final_val =
255 if extraction.format.as_deref() == Some("json") {
256 serde_json::from_str(&val)
257 .unwrap_or(serde_json::json!(val))
258 } else {
259 serde_json::json!(val)
260 };
261
262 crate::db::upsert_step_output_with_sensitivity(
263 &mut *tx,
264 step_id,
265 &extraction.name,
266 &final_val,
267 extraction.sensitive.unwrap_or(false),
268 )
269 .await?;
270 break;
271 }
272 }
273 }
274 }
275 }
276 }
277 }
278 }
279 }
280
281 if let Some(dsl_step) = dsl_step {
282 let all_instances_of_this_step: Vec<&StepInstance> = all_steps
284 .iter()
285 .filter(|s| s.step_name == dsl_step.name)
286 .collect();
287
288 let finished_instances = all_instances_of_this_step
289 .iter()
290 .filter(|s| s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped)
291 .count();
292
293 let total_instances = all_instances_of_this_step.len();
294
295 if finished_instances < total_instances {
296 let waiting_instances: Vec<&&StepInstance> = all_instances_of_this_step
298 .iter()
299 .filter(|s| s.status == StepStatus::WaitingForEvent)
300 .collect();
301
302 if !waiting_instances.is_empty() {
303 let running_or_pending = all_instances_of_this_step
304 .iter()
305 .filter(|s| s.status == StepStatus::Running || s.status == StepStatus::Pending)
306 .count();
307
308 let max_parallel = dsl_step
309 .strategy
310 .as_ref()
311 .and_then(|s| s.max_parallel)
312 .unwrap_or(u32::MAX);
313
314 if (running_or_pending as u32) < max_parallel {
315 let to_schedule = max_parallel - (running_or_pending as u32);
316 for next_instance in waiting_instances.iter().take(to_schedule as usize) {
317 let machine =
318 crate::step_machine::StepMachine::<
319 crate::step_machine::state::WaitingForEvent,
320 >::from_instance((**next_instance).clone());
321 let _ = machine.reschedule(&mut *tx).await?;
322
323 let inst_data: (Value, Value) =
324 crate::db::get_step_spec_and_params(&mut *tx, next_instance.id).await?;
325
326 dispatch_step_instance(
327 run_id,
328 next_instance.id,
329 &dsl_step.name,
330 &dsl_step.r#type,
331 &inst_data.0,
332 &inst_data.1,
333 nats_client.clone(),
334 pool.clone(),
335 tls_reloader.clone(),
336 )
337 .await?;
338 }
339 }
340 }
341 tx.commit().await?;
342 return Ok(());
343 }
344
345 if !dsl_step.next.is_empty() {
347 let hcl_ctx = crate::hcl_eval::create_context(
348 context.inputs.clone(),
349 run_id,
350 fetch_outputs(run_id, &mut *tx).await?,
351 );
352
353 for next_step_name in &dsl_step.next {
354 let predecessors: Vec<&ast::Step> = workflow
355 .steps
356 .iter()
357 .filter(|s| s.next.contains(next_step_name))
358 .collect();
359
360 let all_predecessors_done = predecessors.iter().all(|pred_dsl| {
361 let pred_instances: Vec<&StepInstance> = all_steps
362 .iter()
363 .filter(|s| s.step_name == pred_dsl.name)
364 .collect();
365
366 !pred_instances.is_empty()
367 && pred_instances.iter().all(|s| {
368 s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped
369 })
370 });
371
372 if all_predecessors_done {
373 if let Some(next_dsl) =
374 workflow.steps.iter().find(|s| s.name == *next_step_name)
375 {
376 #[allow(clippy::explicit_auto_deref)]
377 schedule_step(
378 run_id,
379 next_dsl,
380 &mut *tx,
381 nats_client.clone(),
382 &hcl_ctx,
383 pool.clone(),
384 &workflow,
385 )
386 .await?;
387 }
388 }
389 }
390 }
391 }
392
393 let all_steps_final: Vec<StepInstance> =
395 crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
396
397 let all_dsl_steps_done = workflow.steps.iter().all(|dsl_step| {
398 let step_instances: Vec<&StepInstance> = all_steps_final
399 .iter()
400 .filter(|s| s.step_name == dsl_step.name)
401 .collect();
402
403 !step_instances.is_empty()
404 && step_instances
405 .iter()
406 .all(|s| s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped)
407 });
408
409 if all_dsl_steps_done {
410 let run = fetch_run(run_id, &mut *tx).await?;
411 let machine = WorkflowMachine::<state::Running>::new_from_run(run.clone());
412 let _ = machine.succeed(&mut *tx).await?;
413
414 let js = async_nats::jetstream::new(nats_client.clone());
415 if let Err(e) = stormchaser_model::nats::publish_cloudevent(
416 &js,
417 "stormchaser.v1.run.completed",
418 "workflow_completed",
419 "stormchaser-engine",
420 serde_json::to_value(WorkflowCompletedEvent {
421 run_id,
422 event_type: "workflow_completed".to_string(),
423 timestamp: chrono::Utc::now(),
424 })
425 .unwrap(),
426 None,
427 None,
428 )
429 .await
430 {
431 error!(
432 "Failed to publish workflow completed event for {}: {:?}",
433 run_id, e
434 );
435 }
436
437 crate::RUNS_COMPLETED.add(
438 1,
439 &[
440 opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
441 opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
442 ],
443 );
444
445 tx.commit().await?;
446 info!(
447 "Workflow run {} completed successfully, archiving...",
448 run_id
449 );
450 archive_workflow(run_id, pool.clone()).await?;
451 return Ok(());
452 } else {
453 tx.commit().await?;
454 }
455
456 if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
457 error!(
458 "Failed to dispatch pending steps for run {}: {:?}",
459 run_id, e
460 );
461 }
462
463 Ok(())
464}