Skip to main content

stormchaser_engine/handler/step/events/
completed.rs

1#![allow(clippy::explicit_auto_deref)]
2use crate::handler::StepInstance;
3use crate::handler::{
4    archive_workflow, dispatch_pending_steps, fetch_outputs, fetch_run, fetch_run_context,
5    fetch_step_instance,
6};
7use crate::workflow_machine::{state, WorkflowMachine};
8use anyhow::{Context, Result};
9use chrono::Utc;
10use serde_json::Value;
11use sqlx::PgPool;
12use std::sync::Arc;
13use stormchaser_dsl::ast::{self, Workflow};
14use stormchaser_model::dsl::OutputExtraction;
15use stormchaser_model::events::WorkflowCompletedEvent;
16use stormchaser_model::events::{EventSource, EventType, WorkflowEventType};
17use stormchaser_model::nats::publish_cloudevent;
18use stormchaser_model::step::StepStatus;
19use stormchaser_model::LogBackend;
20use stormchaser_tls::TlsReloader;
21use tracing::{debug, error, info};
22use uuid::Uuid;
23
24use crate::handler::step::dispatch::{dispatch_step_instance, find_step};
25use crate::handler::step::quota::release_step_quota_for_instance;
26use crate::handler::step::scheduling::schedule_step;
27
28use super::helpers::persist_step_test_reports;
29
30#[tracing::instrument(skip(event, pool, nats_client, log_backend, tls_reloader), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
31/// Handle step completed.
32pub async fn handle_step_completed(
33    event: stormchaser_model::events::StepCompletedEvent,
34    pool: PgPool,
35    nats_client: async_nats::Client,
36    log_backend: Arc<Option<LogBackend>>,
37    tls_reloader: Arc<TlsReloader>,
38) -> Result<()> {
39    let run_id = event.run_id;
40    let step_id = event.step_id;
41
42    let span = tracing::Span::current();
43    span.record("run_id", tracing::field::display(run_id));
44    span.record("step_id", tracing::field::display(step_id));
45
46    info!("Step {} (Run {}) completed successfully", step_id, run_id);
47
48    let mut tx = pool.begin().await?;
49
50    // Lock the workflow run to serialize state evaluations and prevent parallel join race conditions
51    if crate::db::lock_workflow_run(&mut *tx, run_id)
52        .await?
53        .is_none()
54    {
55        debug!(
56            "Workflow run {} already archived or missing, skipping handler",
57            run_id
58        );
59        return Ok(());
60    }
61
62    // 1. Update StepInstance status
63    let instance = fetch_step_instance(step_id, &mut *tx).await?;
64
65    // Ensure we don't process duplicate completion events
66    if instance.status == StepStatus::Succeeded || instance.status == StepStatus::Failed {
67        return Ok(());
68    }
69
70    let _ = release_step_quota_for_instance(&mut *tx, run_id, step_id).await;
71
72    let machine =
73        crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
74            instance.clone(),
75        );
76    let _ = machine.succeed(&mut *tx).await?;
77
78    let attributes = [
79        opentelemetry::KeyValue::new("step_name", instance.step_name),
80        opentelemetry::KeyValue::new("step_type", instance.step_type),
81        opentelemetry::KeyValue::new("runner_id", instance.runner_id.unwrap_or_default()),
82    ];
83
84    crate::STEPS_COMPLETED.add(1, &attributes);
85
86    if let Some(started_at) = instance.started_at {
87        let duration = (Utc::now() - started_at).to_std().unwrap_or_default();
88        crate::STEP_DURATION.record(duration.as_secs_f64(), &attributes);
89    }
90
91    // 2. Fetch ALL steps again to ensure we have the latest status for everyone
92    let all_steps: Vec<StepInstance> =
93        crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
94
95    // 3. Persist outputs
96    if let Some(outputs) = event
97        .outputs
98        .as_ref()
99        .map(|m| {
100            m.clone()
101                .into_iter()
102                .collect::<serde_json::Map<String, serde_json::Value>>()
103        })
104        .as_ref()
105    {
106        for (key, value) in outputs {
107            crate::db::upsert_step_output(&mut *tx, step_id, key, value).await?;
108        }
109    }
110
111    // 3.5 Persist storage hashes if provided
112    if let Some(storage_hashes) = event
113        .storage_hashes
114        .as_ref()
115        .map(|m| {
116            m.clone()
117                .into_iter()
118                .collect::<serde_json::Map<String, serde_json::Value>>()
119        })
120        .as_ref()
121    {
122        for (name, hash_val) in storage_hashes {
123            if let Some(hash) = hash_val.as_str() {
124                crate::db::upsert_run_storage_state(&mut *tx, run_id.into_inner(), name, hash)
125                    .await?;
126            }
127        }
128    }
129
130    // 3.6 Persist artifact metadata if provided
131    if let Some(artifacts) = event
132        .artifacts
133        .as_ref()
134        .map(|m| {
135            m.clone()
136                .into_iter()
137                .collect::<serde_json::Map<String, serde_json::Value>>()
138        })
139        .as_ref()
140    {
141        let context = fetch_run_context(run_id, &mut *tx).await?;
142        let workflow: Workflow = serde_json::from_value(context.workflow_definition)
143            .context("Failed to parse workflow definition from DB")?;
144
145        for (name, meta) in artifacts {
146            // Find which storage this artifact belongs to
147            let storage = workflow
148                .storage
149                .iter()
150                .find(|s| s.artifacts.iter().any(|a| a.name == *name));
151            if let Some(s) = storage {
152                let backend_id: Option<Uuid> = if let Some(ref backend_name) = s.backend {
153                    crate::db::get_storage_backend_id_by_name(&mut *tx, backend_name).await?
154                } else {
155                    crate::db::get_default_sfs_backend_id(&mut *tx).await?
156                };
157
158                if let Some(bid) = backend_id {
159                    let remote_path = format!("artifacts/{}/{}/{}", run_id, s.name, name);
160                    crate::db::insert_artifact_registry(
161                        &mut *tx,
162                        run_id,
163                        step_id,
164                        name,
165                        stormchaser_model::BackendId::new(bid),
166                        remote_path,
167                        meta.clone(),
168                    )
169                    .await?;
170                }
171            }
172        }
173    }
174
175    // 3.8 Persist test reports if provided
176    persist_step_test_reports(event.test_reports.as_ref(), &mut tx, run_id, step_id, &pool).await?;
177
178    // 2.5 Scrape outputs from logs if configured
179    let context = fetch_run_context(run_id, &mut *tx).await?;
180    let workflow: Workflow = serde_json::from_value(context.workflow_definition)
181        .context("Failed to parse workflow definition from DB")?;
182
183    let all_steps_initial: Vec<StepInstance> =
184        crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
185
186    let current_step_instance = all_steps_initial
187        .iter()
188        .find(|s| s.id.into_inner() == step_id.into_inner())
189        .context("Completed step not found in DB")?;
190
191    let mut dsl_step = find_step(&workflow.steps, &current_step_instance.step_name).cloned();
192
193    if let Some(step) = &mut dsl_step {
194        if step.r#type == "TerraformApply" {
195            step.outputs.push(OutputExtraction {
196                name: "terraform".to_string(),
197                source: "stdout".to_string(),
198                marker: Some("--- TF OUTPUTS ---".to_string()),
199                format: Some("json".to_string()),
200                regex: Some(r"--- TF OUTPUTS ---\s*(.*)".to_string()),
201                group: Some(1),
202                sensitive: Some(false),
203            });
204        } else if step.r#type == "TerraformPlan" {
205            step.outputs.push(OutputExtraction {
206                name: "plan_summary".to_string(),
207                source: "stdout".to_string(),
208                marker: Some("--- TF PLAN SUMMARY ---".to_string()),
209                format: Some("string".to_string()),
210                regex: Some(r"--- TF PLAN SUMMARY ---\s*(.*)".to_string()),
211                group: Some(1),
212                sensitive: Some(false),
213            });
214            step.outputs.push(OutputExtraction {
215                name: "plan_json".to_string(),
216                source: "stdout".to_string(),
217                marker: Some("--- TF PLAN JSON ---".to_string()),
218                format: Some("json".to_string()),
219                regex: Some(r"--- TF PLAN JSON ---\s*(.*)".to_string()),
220                group: Some(1),
221                sensitive: Some(false),
222            });
223        }
224    }
225
226    if let (Some(dsl_step), Some(backend)) = (&dsl_step, &*log_backend) {
227        if !dsl_step.outputs.is_empty() {
228            tracing::info!("Scraping outputs from logs for step {}", dsl_step.name);
229            let logs = backend
230                .fetch_step_logs(
231                    &dsl_step.name,
232                    step_id,
233                    current_step_instance.started_at,
234                    current_step_instance.finished_at,
235                    Some(5000), // Get up to 5000 lines for output scraping
236                )
237                .await
238                .unwrap_or_default();
239
240            let mut filtered_logs = Vec::new();
241            let mut in_output_block = dsl_step.start_marker.is_none();
242
243            for line in &logs {
244                if let Some(start) = &dsl_step.start_marker {
245                    if line.contains(start) {
246                        in_output_block = true;
247                        continue;
248                    }
249                }
250                if let Some(end) = &dsl_step.end_marker {
251                    if line.contains(end) {
252                        in_output_block = false;
253                        continue;
254                    }
255                }
256                if in_output_block {
257                    filtered_logs.push(line);
258                }
259            }
260
261            for extraction in &dsl_step.outputs {
262                if extraction.source == "logs" || extraction.source == "stdout" {
263                    if let Some(regex_str) = &extraction.regex {
264                        if let Ok(re) = regex::Regex::new(regex_str) {
265                            for line in filtered_logs.iter().rev() {
266                                if let Some(caps) = re.captures(line) {
267                                    let value = if let Some(marker) = &extraction.marker {
268                                        if line.contains(marker) {
269                                            caps.get(extraction.group.unwrap_or(1) as usize)
270                                                .map(|m| m.as_str().to_string())
271                                        } else {
272                                            None
273                                        }
274                                    } else {
275                                        caps.get(extraction.group.unwrap_or(1) as usize)
276                                            .map(|m| m.as_str().to_string())
277                                    };
278
279                                    if let Some(val) = value {
280                                        let final_val =
281                                            if extraction.format.as_deref() == Some("json") {
282                                                serde_json::from_str(&val)
283                                                    .unwrap_or(serde_json::json!(val))
284                                            } else {
285                                                serde_json::json!(val)
286                                            };
287
288                                        crate::db::upsert_step_output_with_sensitivity(
289                                            &mut *tx,
290                                            step_id,
291                                            &extraction.name,
292                                            &final_val,
293                                            extraction.sensitive.unwrap_or(false),
294                                        )
295                                        .await?;
296                                        break;
297                                    }
298                                }
299                            }
300                        }
301                    }
302                }
303            }
304        }
305    }
306
307    if let Some(dsl_step) = dsl_step {
308        if !process_step_completion(
309            dsl_step,
310            &all_steps,
311            run_id,
312            &mut *tx,
313            nats_client.clone(),
314            pool.clone(),
315            tls_reloader.clone(),
316            context.inputs.clone(),
317            &workflow,
318        )
319        .await?
320        {
321            tx.commit().await?;
322            return Ok(());
323        }
324    }
325
326    // 5. Check if all steps in the workflow are Done
327    let all_steps_final: Vec<StepInstance> =
328        crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
329
330    let all_dsl_steps_done = workflow.steps.iter().all(|dsl_step| {
331        let step_instances: Vec<&StepInstance> = all_steps_final
332            .iter()
333            .filter(|s| s.step_name == dsl_step.name)
334            .collect();
335
336        !step_instances.is_empty()
337            && step_instances
338                .iter()
339                .all(|s| s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped)
340    });
341
342    if all_dsl_steps_done {
343        let run = fetch_run(run_id, &mut *tx).await?;
344        let machine = WorkflowMachine::<state::Running>::new_from_run(run.clone());
345        let _ = machine.succeed(&mut *tx).await?;
346
347        let js = async_nats::jetstream::new(nats_client.clone());
348        use stormchaser_model::nats::NatsSubject;
349        if let Err(e) = publish_cloudevent(
350            &js,
351            NatsSubject::RunCompleted,
352            EventType::Workflow(WorkflowEventType::Completed),
353            EventSource::Engine,
354            serde_json::to_value(WorkflowCompletedEvent {
355                run_id,
356                event_type: EventType::Workflow(WorkflowEventType::Completed),
357                timestamp: chrono::Utc::now(),
358            })
359            .unwrap(),
360            None,
361            None,
362        )
363        .await
364        {
365            error!(
366                "Failed to publish workflow completed event for {}: {:?}",
367                run_id, e
368            );
369        }
370
371        crate::RUNS_COMPLETED.add(
372            1,
373            &[
374                opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
375                opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
376            ],
377        );
378
379        tx.commit().await?;
380        info!(
381            "Workflow run {} completed successfully, archiving...",
382            run_id
383        );
384        archive_workflow(run_id, pool.clone()).await?;
385        return Ok(());
386    } else {
387        tx.commit().await?;
388    }
389
390    if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
391        error!(
392            "Failed to dispatch pending steps for run {}: {:?}",
393            run_id, e
394        );
395    }
396
397    Ok(())
398}
399
400#[allow(clippy::too_many_arguments)]
401async fn process_step_completion(
402    dsl_step: ast::Step,
403    all_steps: &[StepInstance],
404    run_id: stormchaser_model::RunId,
405    tx: &mut sqlx::PgConnection,
406    nats_client: async_nats::Client,
407    pool: PgPool,
408    tls_reloader: Arc<TlsReloader>,
409    inputs: serde_json::Value,
410    workflow: &ast::Workflow,
411) -> Result<bool> {
412    // 3.1 Check if this step is iterated and if we need to schedule more batches
413    let all_instances_of_this_step: Vec<&StepInstance> = all_steps
414        .iter()
415        .filter(|s| s.step_name == dsl_step.name)
416        .collect();
417
418    let finished_instances = all_instances_of_this_step
419        .iter()
420        .filter(|s| s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped)
421        .count();
422
423    let total_instances = all_instances_of_this_step.len();
424
425    if finished_instances < total_instances {
426        // Some instances are still running or waiting
427        let waiting_instances: Vec<&&StepInstance> = all_instances_of_this_step
428            .iter()
429            .filter(|s| s.status == StepStatus::WaitingForEvent)
430            .collect();
431
432        if !waiting_instances.is_empty() {
433            let running_or_pending = all_instances_of_this_step
434                .iter()
435                .filter(|s| s.status == StepStatus::Running || s.status == StepStatus::Pending)
436                .count();
437
438            let max_parallel = dsl_step
439                .strategy
440                .as_ref()
441                .and_then(|s| s.max_parallel)
442                .unwrap_or(u32::MAX);
443
444            if (running_or_pending as u32) < max_parallel {
445                let to_schedule = max_parallel - (running_or_pending as u32);
446                for next_instance in waiting_instances.iter().take(to_schedule as usize) {
447                    let machine = crate::step_machine::StepMachine::<
448                        crate::step_machine::state::WaitingForEvent,
449                    >::from_instance((**next_instance).clone());
450                    let _ = machine.reschedule(&mut *tx).await?;
451
452                    let inst_data: (Value, Value) =
453                        crate::db::get_step_spec_and_params(&mut *tx, next_instance.id).await?;
454
455                    dispatch_step_instance(
456                        run_id,
457                        next_instance.id,
458                        &dsl_step.name,
459                        &dsl_step.r#type,
460                        &inst_data.0,
461                        &inst_data.1,
462                        nats_client.clone(),
463                        pool.clone(),
464                        tls_reloader.clone(),
465                    )
466                    .await?;
467                }
468            }
469        }
470        return Ok(false);
471    }
472
473    // 4. All instances of THIS step are done, evaluate successors
474    if !dsl_step.next.is_empty() {
475        let hcl_ctx =
476            crate::hcl_eval::create_context(inputs, run_id, fetch_outputs(run_id, &mut *tx).await?);
477
478        for next_step_name in &dsl_step.next {
479            let predecessors: Vec<&ast::Step> = workflow
480                .steps
481                .iter()
482                .filter(|s| s.next.contains(next_step_name))
483                .collect();
484
485            let all_predecessors_done = predecessors.iter().all(|pred_dsl| {
486                let pred_instances: Vec<&StepInstance> = all_steps
487                    .iter()
488                    .filter(|s| s.step_name == pred_dsl.name)
489                    .collect();
490
491                !pred_instances.is_empty()
492                    && pred_instances.iter().all(|s| {
493                        s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped
494                    })
495            });
496
497            if all_predecessors_done {
498                if let Some(next_dsl) = workflow.steps.iter().find(|s| s.name == *next_step_name) {
499                    #[allow(clippy::explicit_auto_deref)]
500                    schedule_step(
501                        run_id,
502                        next_dsl,
503                        &mut *tx,
504                        nats_client.clone(),
505                        &hcl_ctx,
506                        pool.clone(),
507                        workflow,
508                    )
509                    .await?;
510                }
511            }
512        }
513    }
514
515    Ok(true)
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521
522    #[tokio::test]
523    #[ignore]
524    async fn test_process_step_completion_compiles() {
525        let _f = process_step_completion;
526    }
527}