Skip to main content

stormchaser_engine/handler/step/
events.rs

1#![allow(clippy::explicit_auto_deref)]
2use crate::handler::{
3    archive_workflow, dispatch_pending_steps, fetch_outputs, fetch_run, fetch_run_context,
4    fetch_step_instance,
5};
6use crate::workflow_machine::{state, WorkflowMachine};
7use anyhow::{Context, Result};
8use chrono::Utc;
9use flate2::read::GzDecoder;
10use serde_json::Value;
11use sqlx::{PgPool, Postgres, Transaction};
12use std::io::Read;
13use std::sync::Arc;
14use stormchaser_dsl::ast::Workflow;
15use stormchaser_model::step::{StepInstance, StepStatus};
16use stormchaser_tls::TlsReloader;
17use tar::Archive;
18use tracing::{debug, error, info};
19use uuid::Uuid;
20
21use super::dispatch::dispatch_step_instance;
22use super::quota::release_step_quota_for_instance;
23use super::scheduling::schedule_step;
24
25use stormchaser_dsl::ast;
26use stormchaser_model::LogBackend;
27use stormchaser_model::StorageBackend;
28
29#[tracing::instrument(skip(payload, pool), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
30/// Handle step unpacking sfs.
31pub async fn handle_step_unpacking_sfs(payload: Value, pool: PgPool) -> Result<()> {
32    let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
33    let run_id = Uuid::parse_str(run_id_str)?;
34    let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
35    let step_id = Uuid::parse_str(step_id_str)?;
36
37    let span = tracing::Span::current();
38    span.record("run_id", tracing::field::display(run_id));
39    span.record("step_id", tracing::field::display(step_id));
40    let runner_id = payload["runner_id"].as_str().unwrap_or("unknown");
41
42    info!(
43        "Step {} (Run {}) is now unpacking SFS on runner {}",
44        step_id, run_id, runner_id
45    );
46
47    let instance = fetch_step_instance(step_id, &pool).await?;
48    let machine =
49        crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
50            instance,
51        );
52    let _ = machine
53        .start_unpacking(runner_id.to_string(), &mut *pool.acquire().await?)
54        .await?;
55
56    Ok(())
57}
58
59#[tracing::instrument(skip(payload, pool), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
60/// Handle step packing sfs.
61pub async fn handle_step_packing_sfs(payload: Value, pool: PgPool) -> Result<()> {
62    let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
63    let run_id = Uuid::parse_str(run_id_str)?;
64    let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
65    let step_id = Uuid::parse_str(step_id_str)?;
66
67    let span = tracing::Span::current();
68    span.record("run_id", tracing::field::display(run_id));
69    span.record("step_id", tracing::field::display(step_id));
70
71    info!("Step {} (Run {}) is now packing SFS", step_id, run_id);
72
73    let instance = fetch_step_instance(step_id, &pool).await?;
74    let machine =
75        crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
76            instance,
77        );
78    let _ = machine.start_packing(&mut *pool.acquire().await?).await?;
79
80    Ok(())
81}
82
83#[tracing::instrument(skip(payload, pool), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
84/// Handle step running.
85pub async fn handle_step_running(payload: Value, pool: PgPool) -> Result<()> {
86    let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
87    let run_id = Uuid::parse_str(run_id_str)?;
88    let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
89    let step_id = Uuid::parse_str(step_id_str)?;
90
91    let span = tracing::Span::current();
92    span.record("run_id", tracing::field::display(run_id));
93    span.record("step_id", tracing::field::display(step_id));
94    let runner_id = payload["runner_id"].as_str().unwrap_or("unknown");
95
96    info!(
97        "Step {} (Run {}) is now running on runner {}",
98        step_id, run_id, runner_id
99    );
100
101    // 1. Fetch current instance
102    let instance = fetch_step_instance(step_id, &pool).await?;
103
104    // 2. Use state machine to transition
105    let machine =
106        crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
107            instance.clone(),
108        );
109    let _ = machine
110        .start(runner_id.to_string(), &mut *pool.acquire().await?)
111        .await?;
112
113    crate::STEPS_STARTED.add(
114        1,
115        &[
116            opentelemetry::KeyValue::new("step_name", instance.step_name),
117            opentelemetry::KeyValue::new("step_type", instance.step_type),
118            opentelemetry::KeyValue::new("runner_id", runner_id.to_string()),
119        ],
120    );
121
122    Ok(())
123}
124
125#[tracing::instrument(skip(payload, pool, nats_client, log_backend, tls_reloader), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
126/// Handle step completed.
127pub async fn handle_step_completed(
128    payload: Value,
129    pool: PgPool,
130    nats_client: async_nats::Client,
131    log_backend: Arc<Option<LogBackend>>,
132    tls_reloader: Arc<TlsReloader>,
133) -> Result<()> {
134    let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
135    let run_id = Uuid::parse_str(run_id_str)?;
136    let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
137    let step_id = Uuid::parse_str(step_id_str)?;
138
139    let span = tracing::Span::current();
140    span.record("run_id", tracing::field::display(run_id));
141    span.record("step_id", tracing::field::display(step_id));
142
143    info!("Step {} (Run {}) completed successfully", step_id, run_id);
144
145    let mut tx = pool.begin().await?;
146
147    // Lock the workflow run to serialize state evaluations and prevent parallel join race conditions
148    if crate::db::lock_workflow_run(&mut *tx, run_id)
149        .await?
150        .is_none()
151    {
152        debug!(
153            "Workflow run {} already archived or missing, skipping handler",
154            run_id
155        );
156        return Ok(());
157    }
158
159    // 1. Update StepInstance status
160    let instance = fetch_step_instance(step_id, &mut *tx).await?;
161
162    // Ensure we don't process duplicate completion events
163    if instance.status == StepStatus::Succeeded || instance.status == StepStatus::Failed {
164        return Ok(());
165    }
166
167    let _ = release_step_quota_for_instance(&mut *tx, run_id, step_id).await;
168
169    let machine =
170        crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
171            instance.clone(),
172        );
173    let _ = machine.succeed(&mut *tx).await?;
174
175    let attributes = [
176        opentelemetry::KeyValue::new("step_name", instance.step_name),
177        opentelemetry::KeyValue::new("step_type", instance.step_type),
178        opentelemetry::KeyValue::new("runner_id", instance.runner_id.unwrap_or_default()),
179    ];
180
181    crate::STEPS_COMPLETED.add(1, &attributes);
182
183    if let Some(started_at) = instance.started_at {
184        let duration = (Utc::now() - started_at).to_std().unwrap_or_default();
185        crate::STEP_DURATION.record(duration.as_secs_f64(), &attributes);
186    }
187
188    // 2. Fetch ALL steps again to ensure we have the latest status for everyone
189    let all_steps: Vec<StepInstance> =
190        crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
191
192    // 3. Persist outputs
193    if let Some(outputs) = payload["outputs"].as_object() {
194        for (key, value) in outputs {
195            crate::db::upsert_step_output(&mut *tx, step_id, key, value).await?;
196        }
197    }
198
199    // 3.5 Persist storage hashes if provided
200    if let Some(storage_hashes) = payload["storage_hashes"].as_object() {
201        for (name, hash_val) in storage_hashes {
202            if let Some(hash) = hash_val.as_str() {
203                crate::db::upsert_run_storage_state(&mut *tx, run_id, name, hash).await?;
204            }
205        }
206    }
207
208    // 3.6 Persist artifact metadata if provided
209    if let Some(artifacts) = payload["artifacts"].as_object() {
210        let context = fetch_run_context(run_id, &mut *tx).await?;
211        let workflow: Workflow = serde_json::from_value(context.workflow_definition)
212            .context("Failed to parse workflow definition from DB")?;
213
214        for (name, meta) in artifacts {
215            // Find which storage this artifact belongs to
216            let storage = workflow
217                .storage
218                .iter()
219                .find(|s| s.artifacts.iter().any(|a| a.name == *name));
220            if let Some(s) = storage {
221                let backend_id: Option<Uuid> = if let Some(ref backend_name) = s.backend {
222                    crate::db::get_storage_backend_id_by_name(&mut *tx, backend_name).await?
223                } else {
224                    crate::db::get_default_sfs_backend_id(&mut *tx).await?
225                };
226
227                if let Some(bid) = backend_id {
228                    let remote_path =
229                        format!("artifacts/{}/{}/{}/{}", run_id, step_id, s.name, name);
230                    crate::db::insert_artifact_registry(
231                        &mut *tx,
232                        run_id,
233                        step_id,
234                        name,
235                        bid,
236                        remote_path,
237                        meta.clone(),
238                    )
239                    .await?;
240                }
241            }
242        }
243    }
244
245    // 3.8 Persist test reports if provided
246    persist_step_test_reports(&payload, &mut tx, run_id, step_id, &pool).await?;
247
248    // 2.5 Scrape outputs from logs if configured
249    let context = fetch_run_context(run_id, &mut *tx).await?;
250    let workflow: Workflow = serde_json::from_value(context.workflow_definition)
251        .context("Failed to parse workflow definition from DB")?;
252
253    let all_steps_initial: Vec<StepInstance> =
254        crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
255
256    let current_step_instance = all_steps_initial
257        .iter()
258        .find(|s| s.id == step_id)
259        .context("Completed step not found in DB")?;
260
261    let dsl_step = workflow
262        .steps
263        .iter()
264        .find(|s| s.name == current_step_instance.step_name);
265
266    if let (Some(dsl_step), Some(backend)) = (dsl_step, &*log_backend) {
267        if !dsl_step.outputs.is_empty() {
268            tracing::info!("Scraping outputs from logs for step {}", dsl_step.name);
269            let logs = backend
270                .fetch_step_logs(
271                    &dsl_step.name,
272                    step_id,
273                    current_step_instance.started_at,
274                    current_step_instance.finished_at,
275                )
276                .await
277                .unwrap_or_default();
278
279            let mut filtered_logs = Vec::new();
280            let mut in_output_block = dsl_step.start_marker.is_none();
281
282            for line in &logs {
283                if let Some(start) = &dsl_step.start_marker {
284                    if line.contains(start) {
285                        in_output_block = true;
286                        continue;
287                    }
288                }
289                if let Some(end) = &dsl_step.end_marker {
290                    if line.contains(end) {
291                        in_output_block = false;
292                        continue;
293                    }
294                }
295                if in_output_block {
296                    filtered_logs.push(line);
297                }
298            }
299
300            for extraction in &dsl_step.outputs {
301                if extraction.source == "logs" || extraction.source == "stdout" {
302                    if let Some(regex_str) = &extraction.regex {
303                        if let Ok(re) = regex::Regex::new(regex_str) {
304                            for line in filtered_logs.iter().rev() {
305                                if let Some(caps) = re.captures(line) {
306                                    let value = if let Some(marker) = &extraction.marker {
307                                        if line.contains(marker) {
308                                            caps.get(extraction.group.unwrap_or(1) as usize)
309                                                .map(|m| m.as_str().to_string())
310                                        } else {
311                                            None
312                                        }
313                                    } else {
314                                        caps.get(extraction.group.unwrap_or(1) as usize)
315                                            .map(|m| m.as_str().to_string())
316                                    };
317
318                                    if let Some(val) = value {
319                                        crate::db::upsert_step_output_with_sensitivity(
320                                            &mut *tx,
321                                            step_id,
322                                            &extraction.name,
323                                            &serde_json::json!(val),
324                                            extraction.sensitive.unwrap_or(false),
325                                        )
326                                        .await?;
327                                        break;
328                                    }
329                                }
330                            }
331                        }
332                    }
333                }
334            }
335        }
336    }
337
338    if let Some(dsl_step) = dsl_step {
339        // 3.1 Check if this step is iterated and if we need to schedule more batches
340        let all_instances_of_this_step: Vec<&StepInstance> = all_steps
341            .iter()
342            .filter(|s| s.step_name == dsl_step.name)
343            .collect();
344
345        let finished_instances = all_instances_of_this_step
346            .iter()
347            .filter(|s| s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped)
348            .count();
349
350        let total_instances = all_instances_of_this_step.len();
351
352        if finished_instances < total_instances {
353            // Some instances are still running or waiting
354            let waiting_instances: Vec<&&StepInstance> = all_instances_of_this_step
355                .iter()
356                .filter(|s| s.status == StepStatus::WaitingForEvent)
357                .collect();
358
359            if !waiting_instances.is_empty() {
360                let running_or_pending = all_instances_of_this_step
361                    .iter()
362                    .filter(|s| s.status == StepStatus::Running || s.status == StepStatus::Pending)
363                    .count();
364
365                let max_parallel = dsl_step
366                    .strategy
367                    .as_ref()
368                    .and_then(|s| s.max_parallel)
369                    .unwrap_or(u32::MAX);
370
371                if (running_or_pending as u32) < max_parallel {
372                    let to_schedule = max_parallel - (running_or_pending as u32);
373                    for next_instance in waiting_instances.iter().take(to_schedule as usize) {
374                        let machine =
375                            crate::step_machine::StepMachine::<
376                                crate::step_machine::state::WaitingForEvent,
377                            >::from_instance((**next_instance).clone());
378                        let _ = machine.reschedule(&mut *tx).await?;
379
380                        let inst_data: (Value, Value) =
381                            crate::db::get_step_spec_and_params(&mut *tx, next_instance.id).await?;
382
383                        dispatch_step_instance(
384                            run_id,
385                            next_instance.id,
386                            &dsl_step.name,
387                            &dsl_step.r#type,
388                            &inst_data.0,
389                            &inst_data.1,
390                            nats_client.clone(),
391                            pool.clone(),
392                            tls_reloader.clone(),
393                        )
394                        .await?;
395                    }
396                }
397            }
398            tx.commit().await?;
399            return Ok(());
400        }
401
402        // 4. All instances of THIS step are done, evaluate successors
403        if !dsl_step.next.is_empty() {
404            let hcl_ctx = crate::hcl_eval::create_context(
405                context.inputs.clone(),
406                run_id,
407                fetch_outputs(run_id, &mut *tx).await?,
408            );
409
410            for next_step_name in &dsl_step.next {
411                let predecessors: Vec<&ast::Step> = workflow
412                    .steps
413                    .iter()
414                    .filter(|s| s.next.contains(next_step_name))
415                    .collect();
416
417                let all_predecessors_done = predecessors.iter().all(|pred_dsl| {
418                    let pred_instances: Vec<&StepInstance> = all_steps
419                        .iter()
420                        .filter(|s| s.step_name == pred_dsl.name)
421                        .collect();
422
423                    !pred_instances.is_empty()
424                        && pred_instances.iter().all(|s| {
425                            s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped
426                        })
427                });
428
429                if all_predecessors_done {
430                    if let Some(next_dsl) =
431                        workflow.steps.iter().find(|s| s.name == *next_step_name)
432                    {
433                        #[allow(clippy::explicit_auto_deref)]
434                        schedule_step(
435                            run_id,
436                            next_dsl,
437                            &mut *tx,
438                            nats_client.clone(),
439                            &hcl_ctx,
440                            pool.clone(),
441                            &workflow,
442                        )
443                        .await?;
444                    }
445                }
446            }
447        }
448    }
449
450    // 5. Check if all steps in the workflow are Done
451    let all_steps_final: Vec<StepInstance> =
452        crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
453
454    let all_dsl_steps_done = workflow.steps.iter().all(|dsl_step| {
455        let step_instances: Vec<&StepInstance> = all_steps_final
456            .iter()
457            .filter(|s| s.step_name == dsl_step.name)
458            .collect();
459
460        !step_instances.is_empty()
461            && step_instances
462                .iter()
463                .all(|s| s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped)
464    });
465
466    if all_dsl_steps_done {
467        let run = fetch_run(run_id, &mut *tx).await?;
468        let machine = WorkflowMachine::<state::Running>::new_from_run(run.clone());
469        let _ = machine.succeed(&mut *tx).await?;
470
471        crate::RUNS_COMPLETED.add(
472            1,
473            &[
474                opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
475                opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
476            ],
477        );
478
479        tx.commit().await?;
480        info!(
481            "Workflow run {} completed successfully, archiving...",
482            run_id
483        );
484        archive_workflow(run_id, pool.clone()).await?;
485        return Ok(());
486    } else {
487        tx.commit().await?;
488    }
489
490    if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
491        error!(
492            "Failed to dispatch pending steps for run {}: {:?}",
493            run_id, e
494        );
495    }
496
497    Ok(())
498}
499
500#[tracing::instrument(skip(payload, pool, nats_client, tls_reloader), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
501/// Handle step failed.
502pub async fn handle_step_failed(
503    payload: Value,
504    pool: PgPool,
505    nats_client: async_nats::Client,
506    tls_reloader: Arc<TlsReloader>,
507) -> Result<()> {
508    let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
509    let run_id = Uuid::parse_str(run_id_str)?;
510    let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
511    let step_id = Uuid::parse_str(step_id_str)?;
512
513    let span = tracing::Span::current();
514    span.record("run_id", tracing::field::display(run_id));
515    span.record("step_id", tracing::field::display(step_id));
516    let error_msg = payload["error"].as_str().unwrap_or("Unknown error");
517    let exit_code = payload["exit_code"].as_i64().map(|c| c as i32);
518
519    info!("Step {} (Run {}) failed: {}", step_id, run_id, error_msg);
520
521    let mut tx = pool.begin().await?;
522
523    if crate::db::lock_workflow_run(&mut *tx, run_id)
524        .await?
525        .is_none()
526    {
527        return Ok(());
528    }
529
530    let instance = fetch_step_instance(step_id, &mut *tx).await?;
531
532    // Ensure we don't process duplicate completion events
533    if instance.status == StepStatus::Succeeded || instance.status == StepStatus::Failed {
534        return Ok(());
535    }
536
537    let _ = release_step_quota_for_instance(&mut *tx, run_id, step_id).await;
538
539    let machine =
540        crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
541            instance.clone(),
542        );
543    let _ = machine
544        .fail(error_msg.to_string(), exit_code, &mut *tx)
545        .await?;
546
547    let attributes = [
548        opentelemetry::KeyValue::new("step_name", instance.step_name),
549        opentelemetry::KeyValue::new("step_type", instance.step_type),
550        opentelemetry::KeyValue::new("runner_id", instance.runner_id.unwrap_or_default()),
551        opentelemetry::KeyValue::new("error", error_msg.to_string()),
552    ];
553
554    crate::STEPS_FAILED.add(1, &attributes);
555
556    if let Some(started_at) = instance.started_at {
557        let duration = (Utc::now() - started_at).to_std().unwrap_or_default();
558        crate::STEP_DURATION.record(duration.as_secs_f64(), &attributes);
559    }
560
561    if let Some(outputs) = payload["outputs"].as_object() {
562        for (key, value) in outputs {
563            crate::db::upsert_step_output(&mut *tx, step_id, key, value).await?;
564        }
565    }
566
567    // Persist test reports even on failure
568    persist_step_test_reports(&payload, &mut tx, run_id, step_id, &pool).await?;
569
570    let run = fetch_run(run_id, &mut *tx).await?;
571    let run_machine = WorkflowMachine::<state::Running>::new_from_run(run.clone());
572    let _ = run_machine
573        .fail(format!("Step {} failed: {}", step_id, error_msg), &mut *tx)
574        .await?;
575
576    crate::RUNS_FAILED.add(
577        1,
578        &[
579            opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
580            opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
581            opentelemetry::KeyValue::new("error", format!("Step {} failed", step_id)),
582        ],
583    );
584
585    tx.commit().await?;
586    info!("Workflow run {} failed, archiving...", run_id);
587    archive_workflow(run_id, pool.clone()).await?;
588
589    if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
590        error!(
591            "Failed to dispatch pending steps after failure for run {}: {:?}",
592            run_id, e
593        );
594    }
595
596    Ok(())
597}
598
599async fn persist_step_test_reports(
600    payload: &Value,
601    tx: &mut Transaction<'_, Postgres>,
602    run_id: Uuid,
603    step_id: Uuid,
604    pool: &PgPool,
605) -> Result<()> {
606    if let Some(reports) = payload["test_reports"].as_object() {
607        for (_key, report_val) in reports {
608            let name = report_val
609                .get("name")
610                .and_then(|v| v.as_str())
611                .unwrap_or("unknown");
612            let file_name = report_val
613                .get("file_name")
614                .and_then(|v| v.as_str())
615                .unwrap_or("unknown");
616            let format = report_val
617                .get("format")
618                .and_then(|v| v.as_str())
619                .unwrap_or("unknown");
620            let hash = report_val
621                .get("hash")
622                .and_then(|v| v.as_str())
623                .unwrap_or("");
624
625            if report_val
626                .get("is_claim")
627                .and_then(|v| v.as_bool())
628                .unwrap_or(false)
629            {
630                let remote_path = report_val.get("remote_path").and_then(|v| v.as_str());
631                let backend_id = report_val.get("backend_id").and_then(|v| {
632                    if let Some(s) = v.as_str() {
633                        Uuid::parse_str(s).ok()
634                    } else {
635                        None
636                    }
637                });
638
639                if let (Some(path), Some(bid)) = (remote_path, backend_id) {
640                    // Download and parse
641                    let backend: StorageBackend =
642                        crate::db::storage::get_storage_backend_by_id(pool, bid)
643                            .await?
644                            .ok_or_else(|| anyhow::anyhow!("Storage backend not found"))?;
645
646                    let client = crate::s3::get_s3_client(&backend).await?;
647                    let bucket = backend.config["bucket"]
648                        .as_str()
649                        .context("Missing bucket")?;
650                    let response = client.get_object().bucket(bucket).key(path).send().await?;
651                    let data = response.body.collect().await?.to_vec();
652
653                    // It's a tar.gz
654                    let mut summaries = Vec::new();
655                    let mut test_cases = Vec::new();
656                    let mut raw_contents = Vec::new();
657
658                    {
659                        // Scope for !Send Archive
660                        let tar_gz = GzDecoder::new(&data[..]);
661                        let mut archive = Archive::new(tar_gz);
662                        for entry in archive.entries()? {
663                            let mut entry = entry?;
664                            let mut content = String::new();
665                            entry.read_to_string(&mut content)?;
666
667                            if format == "junit" {
668                                if let Ok((summary, cases)) =
669                                    crate::junit::parse_junit(&content, name, run_id, step_id)
670                                {
671                                    summaries.push(summary);
672                                    test_cases.extend(cases);
673                                }
674                            }
675                            raw_contents.push(content);
676                        }
677                    }
678
679                    for case in test_cases {
680                        crate::db::insert_step_test_case(&mut **tx, run_id, step_id, name, &case)
681                            .await?;
682                    }
683
684                    if let Some(final_summary) = crate::junit::aggregate_summaries(&summaries) {
685                        crate::db::insert_step_test_summary(
686                            &mut **tx,
687                            run_id,
688                            step_id,
689                            name,
690                            &final_summary,
691                        )
692                        .await?;
693                    }
694
695                    let combined_raw = raw_contents.join("\n---\n");
696
697                    crate::db::insert_step_test_report(
698                        &mut **tx,
699                        run_id,
700                        step_id,
701                        name,
702                        file_name,
703                        format,
704                        Some(&combined_raw),
705                        hash,
706                        Some(bid),
707                        Some(path),
708                    )
709                    .await?;
710                }
711            } else if let Some(content) = report_val.get("content").and_then(|v| v.as_str()) {
712                // Legacy in-memory report
713                if format == "junit" {
714                    if let Ok((summary, cases)) =
715                        crate::junit::parse_junit(content, name, run_id, step_id)
716                    {
717                        crate::db::insert_step_test_summary(
718                            &mut **tx, run_id, step_id, name, &summary,
719                        )
720                        .await?;
721                        for case in cases {
722                            crate::db::insert_step_test_case(
723                                &mut **tx, run_id, step_id, name, &case,
724                            )
725                            .await?;
726                        }
727                    }
728                }
729
730                crate::db::insert_step_test_report(
731                    &mut **tx,
732                    run_id,
733                    step_id,
734                    name,
735                    file_name,
736                    format,
737                    Some(content),
738                    hash,
739                    None,
740                    None,
741                )
742                .await?;
743            }
744        }
745    }
746    Ok(())
747}
748
749/// Handles incoming queries for step status or output data over NATS.
750pub async fn handle_step_query(
751    payload: Value,
752    pool: PgPool,
753    nats_client: async_nats::Client,
754    reply: Option<String>,
755) -> Result<()> {
756    let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
757    let step_id = Uuid::parse_str(step_id_str)?;
758
759    let step: Option<StepInstance> = crate::db::get_step_instance_by_id(&pool, step_id)
760        .await
761        .map(|v: Option<StepInstance>| v)?;
762
763    if let Some(reply_subject) = reply {
764        let response = if let Some(s) = step {
765            serde_json::json!({
766                "step_id": step_id,
767                "status": s.status,
768                "exists": true
769            })
770        } else {
771            serde_json::json!({
772                "step_id": step_id,
773                "exists": false
774            })
775        };
776        nats_client
777            .publish(reply_subject, response.to_string().into())
778            .await?;
779    }
780
781    Ok(())
782}