Skip to main content

stormchaser_engine/handler/step/events/
completed.rs

1#![allow(clippy::explicit_auto_deref)]
2use crate::handler::{
3    archive_workflow, dispatch_pending_steps, fetch_outputs, fetch_run, fetch_run_context,
4    fetch_step_instance,
5};
6use crate::workflow_machine::{state, WorkflowMachine};
7use anyhow::{Context, Result};
8use chrono::Utc;
9use serde_json::Value;
10use sqlx::PgPool;
11use std::sync::Arc;
12use stormchaser_dsl::ast::{self, Workflow};
13use stormchaser_model::dsl::OutputExtraction;
14use stormchaser_model::events::WorkflowCompletedEvent;
15use stormchaser_model::step::{StepInstance, StepStatus};
16use stormchaser_model::LogBackend;
17use stormchaser_model::RunId;
18use stormchaser_model::StepInstanceId;
19use stormchaser_tls::TlsReloader;
20use tracing::{debug, error, info};
21use uuid::Uuid;
22
23use crate::handler::step::dispatch::{dispatch_step_instance, find_step};
24use crate::handler::step::quota::release_step_quota_for_instance;
25use crate::handler::step::scheduling::schedule_step;
26
27use super::helpers::persist_step_test_reports;
28
29#[tracing::instrument(skip(payload, pool, nats_client, log_backend, tls_reloader), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
30/// Handle step completed.
31pub async fn handle_step_completed(
32    payload: Value,
33    pool: PgPool,
34    nats_client: async_nats::Client,
35    log_backend: Arc<Option<LogBackend>>,
36    tls_reloader: Arc<TlsReloader>,
37) -> Result<()> {
38    let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
39    let run_id = uuid::Uuid::parse_str(run_id_str).map(RunId::new)?;
40    let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
41    let step_id = uuid::Uuid::parse_str(step_id_str).map(StepInstanceId::new)?;
42
43    let span = tracing::Span::current();
44    span.record("run_id", tracing::field::display(run_id));
45    span.record("step_id", tracing::field::display(step_id));
46
47    info!("Step {} (Run {}) completed successfully", step_id, run_id);
48
49    let mut tx = pool.begin().await?;
50
51    // Lock the workflow run to serialize state evaluations and prevent parallel join race conditions
52    if crate::db::lock_workflow_run(&mut *tx, run_id)
53        .await?
54        .is_none()
55    {
56        debug!(
57            "Workflow run {} already archived or missing, skipping handler",
58            run_id
59        );
60        return Ok(());
61    }
62
63    // 1. Update StepInstance status
64    let instance = fetch_step_instance(step_id, &mut *tx).await?;
65
66    // Ensure we don't process duplicate completion events
67    if instance.status == StepStatus::Succeeded || instance.status == StepStatus::Failed {
68        return Ok(());
69    }
70
71    let _ = release_step_quota_for_instance(&mut *tx, run_id, step_id).await;
72
73    let machine =
74        crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
75            instance.clone(),
76        );
77    let _ = machine.succeed(&mut *tx).await?;
78
79    let attributes = [
80        opentelemetry::KeyValue::new("step_name", instance.step_name),
81        opentelemetry::KeyValue::new("step_type", instance.step_type),
82        opentelemetry::KeyValue::new("runner_id", instance.runner_id.unwrap_or_default()),
83    ];
84
85    crate::STEPS_COMPLETED.add(1, &attributes);
86
87    if let Some(started_at) = instance.started_at {
88        let duration = (Utc::now() - started_at).to_std().unwrap_or_default();
89        crate::STEP_DURATION.record(duration.as_secs_f64(), &attributes);
90    }
91
92    // 2. Fetch ALL steps again to ensure we have the latest status for everyone
93    let all_steps: Vec<StepInstance> =
94        crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
95
96    // 3. Persist outputs
97    if let Some(outputs) = payload["outputs"].as_object() {
98        for (key, value) in outputs {
99            crate::db::upsert_step_output(&mut *tx, step_id, key, value).await?;
100        }
101    }
102
103    // 3.5 Persist storage hashes if provided
104    if let Some(storage_hashes) = payload["storage_hashes"].as_object() {
105        for (name, hash_val) in storage_hashes {
106            if let Some(hash) = hash_val.as_str() {
107                crate::db::upsert_run_storage_state(&mut *tx, run_id.into_inner(), name, hash)
108                    .await?;
109            }
110        }
111    }
112
113    // 3.6 Persist artifact metadata if provided
114    if let Some(artifacts) = payload["artifacts"].as_object() {
115        let context = fetch_run_context(run_id, &mut *tx).await?;
116        let workflow: Workflow = serde_json::from_value(context.workflow_definition)
117            .context("Failed to parse workflow definition from DB")?;
118
119        for (name, meta) in artifacts {
120            // Find which storage this artifact belongs to
121            let storage = workflow
122                .storage
123                .iter()
124                .find(|s| s.artifacts.iter().any(|a| a.name == *name));
125            if let Some(s) = storage {
126                let backend_id: Option<Uuid> = if let Some(ref backend_name) = s.backend {
127                    crate::db::get_storage_backend_id_by_name(&mut *tx, backend_name).await?
128                } else {
129                    crate::db::get_default_sfs_backend_id(&mut *tx).await?
130                };
131
132                if let Some(bid) = backend_id {
133                    let remote_path = format!("artifacts/{}/{}/{}", run_id, s.name, name);
134                    crate::db::insert_artifact_registry(
135                        &mut *tx,
136                        run_id,
137                        step_id,
138                        name,
139                        stormchaser_model::BackendId::new(bid),
140                        remote_path,
141                        meta.clone(),
142                    )
143                    .await?;
144                }
145            }
146        }
147    }
148
149    // 3.8 Persist test reports if provided
150    persist_step_test_reports(&payload, &mut tx, run_id, step_id, &pool).await?;
151
152    // 2.5 Scrape outputs from logs if configured
153    let context = fetch_run_context(run_id, &mut *tx).await?;
154    let workflow: Workflow = serde_json::from_value(context.workflow_definition)
155        .context("Failed to parse workflow definition from DB")?;
156
157    let all_steps_initial: Vec<StepInstance> =
158        crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
159
160    let current_step_instance = all_steps_initial
161        .iter()
162        .find(|s| s.id.into_inner() == step_id.into_inner())
163        .context("Completed step not found in DB")?;
164
165    let mut dsl_step = find_step(&workflow.steps, &current_step_instance.step_name).cloned();
166
167    if let Some(step) = &mut dsl_step {
168        if step.r#type == "TerraformApply" {
169            step.outputs.push(OutputExtraction {
170                name: "terraform".to_string(),
171                source: "stdout".to_string(),
172                marker: Some("--- TF OUTPUTS ---".to_string()),
173                format: Some("json".to_string()),
174                regex: Some(r"--- TF OUTPUTS ---\s*(.*)".to_string()),
175                group: Some(1),
176                sensitive: Some(false),
177            });
178        } else if step.r#type == "TerraformPlan" {
179            step.outputs.push(OutputExtraction {
180                name: "plan_summary".to_string(),
181                source: "stdout".to_string(),
182                marker: Some("--- TF PLAN SUMMARY ---".to_string()),
183                format: Some("string".to_string()),
184                regex: Some(r"--- TF PLAN SUMMARY ---\s*(.*)".to_string()),
185                group: Some(1),
186                sensitive: Some(false),
187            });
188            step.outputs.push(OutputExtraction {
189                name: "plan_json".to_string(),
190                source: "stdout".to_string(),
191                marker: Some("--- TF PLAN JSON ---".to_string()),
192                format: Some("json".to_string()),
193                regex: Some(r"--- TF PLAN JSON ---\s*(.*)".to_string()),
194                group: Some(1),
195                sensitive: Some(false),
196            });
197        }
198    }
199
200    if let (Some(dsl_step), Some(backend)) = (&dsl_step, &*log_backend) {
201        if !dsl_step.outputs.is_empty() {
202            tracing::info!("Scraping outputs from logs for step {}", dsl_step.name);
203            let logs = backend
204                .fetch_step_logs(
205                    &dsl_step.name,
206                    step_id,
207                    current_step_instance.started_at,
208                    current_step_instance.finished_at,
209                    Some(5000), // Get up to 5000 lines for output scraping
210                )
211                .await
212                .unwrap_or_default();
213
214            let mut filtered_logs = Vec::new();
215            let mut in_output_block = dsl_step.start_marker.is_none();
216
217            for line in &logs {
218                if let Some(start) = &dsl_step.start_marker {
219                    if line.contains(start) {
220                        in_output_block = true;
221                        continue;
222                    }
223                }
224                if let Some(end) = &dsl_step.end_marker {
225                    if line.contains(end) {
226                        in_output_block = false;
227                        continue;
228                    }
229                }
230                if in_output_block {
231                    filtered_logs.push(line);
232                }
233            }
234
235            for extraction in &dsl_step.outputs {
236                if extraction.source == "logs" || extraction.source == "stdout" {
237                    if let Some(regex_str) = &extraction.regex {
238                        if let Ok(re) = regex::Regex::new(regex_str) {
239                            for line in filtered_logs.iter().rev() {
240                                if let Some(caps) = re.captures(line) {
241                                    let value = if let Some(marker) = &extraction.marker {
242                                        if line.contains(marker) {
243                                            caps.get(extraction.group.unwrap_or(1) as usize)
244                                                .map(|m| m.as_str().to_string())
245                                        } else {
246                                            None
247                                        }
248                                    } else {
249                                        caps.get(extraction.group.unwrap_or(1) as usize)
250                                            .map(|m| m.as_str().to_string())
251                                    };
252
253                                    if let Some(val) = value {
254                                        let final_val =
255                                            if extraction.format.as_deref() == Some("json") {
256                                                serde_json::from_str(&val)
257                                                    .unwrap_or(serde_json::json!(val))
258                                            } else {
259                                                serde_json::json!(val)
260                                            };
261
262                                        crate::db::upsert_step_output_with_sensitivity(
263                                            &mut *tx,
264                                            step_id,
265                                            &extraction.name,
266                                            &final_val,
267                                            extraction.sensitive.unwrap_or(false),
268                                        )
269                                        .await?;
270                                        break;
271                                    }
272                                }
273                            }
274                        }
275                    }
276                }
277            }
278        }
279    }
280
281    if let Some(dsl_step) = dsl_step {
282        // 3.1 Check if this step is iterated and if we need to schedule more batches
283        let all_instances_of_this_step: Vec<&StepInstance> = all_steps
284            .iter()
285            .filter(|s| s.step_name == dsl_step.name)
286            .collect();
287
288        let finished_instances = all_instances_of_this_step
289            .iter()
290            .filter(|s| s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped)
291            .count();
292
293        let total_instances = all_instances_of_this_step.len();
294
295        if finished_instances < total_instances {
296            // Some instances are still running or waiting
297            let waiting_instances: Vec<&&StepInstance> = all_instances_of_this_step
298                .iter()
299                .filter(|s| s.status == StepStatus::WaitingForEvent)
300                .collect();
301
302            if !waiting_instances.is_empty() {
303                let running_or_pending = all_instances_of_this_step
304                    .iter()
305                    .filter(|s| s.status == StepStatus::Running || s.status == StepStatus::Pending)
306                    .count();
307
308                let max_parallel = dsl_step
309                    .strategy
310                    .as_ref()
311                    .and_then(|s| s.max_parallel)
312                    .unwrap_or(u32::MAX);
313
314                if (running_or_pending as u32) < max_parallel {
315                    let to_schedule = max_parallel - (running_or_pending as u32);
316                    for next_instance in waiting_instances.iter().take(to_schedule as usize) {
317                        let machine =
318                            crate::step_machine::StepMachine::<
319                                crate::step_machine::state::WaitingForEvent,
320                            >::from_instance((**next_instance).clone());
321                        let _ = machine.reschedule(&mut *tx).await?;
322
323                        let inst_data: (Value, Value) =
324                            crate::db::get_step_spec_and_params(&mut *tx, next_instance.id).await?;
325
326                        dispatch_step_instance(
327                            run_id,
328                            next_instance.id,
329                            &dsl_step.name,
330                            &dsl_step.r#type,
331                            &inst_data.0,
332                            &inst_data.1,
333                            nats_client.clone(),
334                            pool.clone(),
335                            tls_reloader.clone(),
336                        )
337                        .await?;
338                    }
339                }
340            }
341            tx.commit().await?;
342            return Ok(());
343        }
344
345        // 4. All instances of THIS step are done, evaluate successors
346        if !dsl_step.next.is_empty() {
347            let hcl_ctx = crate::hcl_eval::create_context(
348                context.inputs.clone(),
349                run_id,
350                fetch_outputs(run_id, &mut *tx).await?,
351            );
352
353            for next_step_name in &dsl_step.next {
354                let predecessors: Vec<&ast::Step> = workflow
355                    .steps
356                    .iter()
357                    .filter(|s| s.next.contains(next_step_name))
358                    .collect();
359
360                let all_predecessors_done = predecessors.iter().all(|pred_dsl| {
361                    let pred_instances: Vec<&StepInstance> = all_steps
362                        .iter()
363                        .filter(|s| s.step_name == pred_dsl.name)
364                        .collect();
365
366                    !pred_instances.is_empty()
367                        && pred_instances.iter().all(|s| {
368                            s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped
369                        })
370                });
371
372                if all_predecessors_done {
373                    if let Some(next_dsl) =
374                        workflow.steps.iter().find(|s| s.name == *next_step_name)
375                    {
376                        #[allow(clippy::explicit_auto_deref)]
377                        schedule_step(
378                            run_id,
379                            next_dsl,
380                            &mut *tx,
381                            nats_client.clone(),
382                            &hcl_ctx,
383                            pool.clone(),
384                            &workflow,
385                        )
386                        .await?;
387                    }
388                }
389            }
390        }
391    }
392
393    // 5. Check if all steps in the workflow are Done
394    let all_steps_final: Vec<StepInstance> =
395        crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
396
397    let all_dsl_steps_done = workflow.steps.iter().all(|dsl_step| {
398        let step_instances: Vec<&StepInstance> = all_steps_final
399            .iter()
400            .filter(|s| s.step_name == dsl_step.name)
401            .collect();
402
403        !step_instances.is_empty()
404            && step_instances
405                .iter()
406                .all(|s| s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped)
407    });
408
409    if all_dsl_steps_done {
410        let run = fetch_run(run_id, &mut *tx).await?;
411        let machine = WorkflowMachine::<state::Running>::new_from_run(run.clone());
412        let _ = machine.succeed(&mut *tx).await?;
413
414        let js = async_nats::jetstream::new(nats_client.clone());
415        if let Err(e) = stormchaser_model::nats::publish_cloudevent(
416            &js,
417            "stormchaser.v1.run.completed",
418            "workflow_completed",
419            "stormchaser-engine",
420            serde_json::to_value(WorkflowCompletedEvent {
421                run_id,
422                event_type: "workflow_completed".to_string(),
423                timestamp: chrono::Utc::now(),
424            })
425            .unwrap(),
426            None,
427            None,
428        )
429        .await
430        {
431            error!(
432                "Failed to publish workflow completed event for {}: {:?}",
433                run_id, e
434            );
435        }
436
437        crate::RUNS_COMPLETED.add(
438            1,
439            &[
440                opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
441                opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
442            ],
443        );
444
445        tx.commit().await?;
446        info!(
447            "Workflow run {} completed successfully, archiving...",
448            run_id
449        );
450        archive_workflow(run_id, pool.clone()).await?;
451        return Ok(());
452    } else {
453        tx.commit().await?;
454    }
455
456    if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
457        error!(
458            "Failed to dispatch pending steps for run {}: {:?}",
459            run_id, e
460        );
461    }
462
463    Ok(())
464}