1#![allow(clippy::explicit_auto_deref)]
2use crate::handler::{
3 archive_workflow, dispatch_pending_steps, fetch_outputs, fetch_run, fetch_run_context,
4 fetch_step_instance,
5};
6use crate::workflow_machine::{state, WorkflowMachine};
7use anyhow::{Context, Result};
8use chrono::Utc;
9use flate2::read::GzDecoder;
10use serde_json::Value;
11use sqlx::{PgPool, Postgres, Transaction};
12use std::io::Read;
13use std::sync::Arc;
14use stormchaser_dsl::ast::Workflow;
15use stormchaser_model::step::{StepInstance, StepStatus};
16use stormchaser_tls::TlsReloader;
17use tar::Archive;
18use tracing::{debug, error, info};
19use uuid::Uuid;
20
21use super::dispatch::dispatch_step_instance;
22use super::quota::release_step_quota_for_instance;
23use super::scheduling::schedule_step;
24
25use stormchaser_dsl::ast;
26use stormchaser_model::LogBackend;
27use stormchaser_model::StorageBackend;
28
29#[tracing::instrument(skip(payload, pool), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
30pub async fn handle_step_unpacking_sfs(payload: Value, pool: PgPool) -> Result<()> {
32 let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
33 let run_id = Uuid::parse_str(run_id_str)?;
34 let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
35 let step_id = Uuid::parse_str(step_id_str)?;
36
37 let span = tracing::Span::current();
38 span.record("run_id", tracing::field::display(run_id));
39 span.record("step_id", tracing::field::display(step_id));
40 let runner_id = payload["runner_id"].as_str().unwrap_or("unknown");
41
42 info!(
43 "Step {} (Run {}) is now unpacking SFS on runner {}",
44 step_id, run_id, runner_id
45 );
46
47 let instance = fetch_step_instance(step_id, &pool).await?;
48 let machine =
49 crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
50 instance,
51 );
52 let _ = machine
53 .start_unpacking(runner_id.to_string(), &mut *pool.acquire().await?)
54 .await?;
55
56 Ok(())
57}
58
59#[tracing::instrument(skip(payload, pool), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
60pub async fn handle_step_packing_sfs(payload: Value, pool: PgPool) -> Result<()> {
62 let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
63 let run_id = Uuid::parse_str(run_id_str)?;
64 let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
65 let step_id = Uuid::parse_str(step_id_str)?;
66
67 let span = tracing::Span::current();
68 span.record("run_id", tracing::field::display(run_id));
69 span.record("step_id", tracing::field::display(step_id));
70
71 info!("Step {} (Run {}) is now packing SFS", step_id, run_id);
72
73 let instance = fetch_step_instance(step_id, &pool).await?;
74 let machine =
75 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
76 instance,
77 );
78 let _ = machine.start_packing(&mut *pool.acquire().await?).await?;
79
80 Ok(())
81}
82
83#[tracing::instrument(skip(payload, pool), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
84pub async fn handle_step_running(payload: Value, pool: PgPool) -> Result<()> {
86 let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
87 let run_id = Uuid::parse_str(run_id_str)?;
88 let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
89 let step_id = Uuid::parse_str(step_id_str)?;
90
91 let span = tracing::Span::current();
92 span.record("run_id", tracing::field::display(run_id));
93 span.record("step_id", tracing::field::display(step_id));
94 let runner_id = payload["runner_id"].as_str().unwrap_or("unknown");
95
96 info!(
97 "Step {} (Run {}) is now running on runner {}",
98 step_id, run_id, runner_id
99 );
100
101 let instance = fetch_step_instance(step_id, &pool).await?;
103
104 let machine =
106 crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
107 instance.clone(),
108 );
109 let _ = machine
110 .start(runner_id.to_string(), &mut *pool.acquire().await?)
111 .await?;
112
113 crate::STEPS_STARTED.add(
114 1,
115 &[
116 opentelemetry::KeyValue::new("step_name", instance.step_name),
117 opentelemetry::KeyValue::new("step_type", instance.step_type),
118 opentelemetry::KeyValue::new("runner_id", runner_id.to_string()),
119 ],
120 );
121
122 Ok(())
123}
124
125#[tracing::instrument(skip(payload, pool, nats_client, log_backend, tls_reloader), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
126pub async fn handle_step_completed(
128 payload: Value,
129 pool: PgPool,
130 nats_client: async_nats::Client,
131 log_backend: Arc<Option<LogBackend>>,
132 tls_reloader: Arc<TlsReloader>,
133) -> Result<()> {
134 let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
135 let run_id = Uuid::parse_str(run_id_str)?;
136 let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
137 let step_id = Uuid::parse_str(step_id_str)?;
138
139 let span = tracing::Span::current();
140 span.record("run_id", tracing::field::display(run_id));
141 span.record("step_id", tracing::field::display(step_id));
142
143 info!("Step {} (Run {}) completed successfully", step_id, run_id);
144
145 let mut tx = pool.begin().await?;
146
147 if crate::db::lock_workflow_run(&mut *tx, run_id)
149 .await?
150 .is_none()
151 {
152 debug!(
153 "Workflow run {} already archived or missing, skipping handler",
154 run_id
155 );
156 return Ok(());
157 }
158
159 let instance = fetch_step_instance(step_id, &mut *tx).await?;
161
162 if instance.status == StepStatus::Succeeded || instance.status == StepStatus::Failed {
164 return Ok(());
165 }
166
167 let _ = release_step_quota_for_instance(&mut *tx, run_id, step_id).await;
168
169 let machine =
170 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
171 instance.clone(),
172 );
173 let _ = machine.succeed(&mut *tx).await?;
174
175 let attributes = [
176 opentelemetry::KeyValue::new("step_name", instance.step_name),
177 opentelemetry::KeyValue::new("step_type", instance.step_type),
178 opentelemetry::KeyValue::new("runner_id", instance.runner_id.unwrap_or_default()),
179 ];
180
181 crate::STEPS_COMPLETED.add(1, &attributes);
182
183 if let Some(started_at) = instance.started_at {
184 let duration = (Utc::now() - started_at).to_std().unwrap_or_default();
185 crate::STEP_DURATION.record(duration.as_secs_f64(), &attributes);
186 }
187
188 let all_steps: Vec<StepInstance> =
190 crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
191
192 if let Some(outputs) = payload["outputs"].as_object() {
194 for (key, value) in outputs {
195 crate::db::upsert_step_output(&mut *tx, step_id, key, value).await?;
196 }
197 }
198
199 if let Some(storage_hashes) = payload["storage_hashes"].as_object() {
201 for (name, hash_val) in storage_hashes {
202 if let Some(hash) = hash_val.as_str() {
203 crate::db::upsert_run_storage_state(&mut *tx, run_id, name, hash).await?;
204 }
205 }
206 }
207
208 if let Some(artifacts) = payload["artifacts"].as_object() {
210 let context = fetch_run_context(run_id, &mut *tx).await?;
211 let workflow: Workflow = serde_json::from_value(context.workflow_definition)
212 .context("Failed to parse workflow definition from DB")?;
213
214 for (name, meta) in artifacts {
215 let storage = workflow
217 .storage
218 .iter()
219 .find(|s| s.artifacts.iter().any(|a| a.name == *name));
220 if let Some(s) = storage {
221 let backend_id: Option<Uuid> = if let Some(ref backend_name) = s.backend {
222 crate::db::get_storage_backend_id_by_name(&mut *tx, backend_name).await?
223 } else {
224 crate::db::get_default_sfs_backend_id(&mut *tx).await?
225 };
226
227 if let Some(bid) = backend_id {
228 let remote_path =
229 format!("artifacts/{}/{}/{}/{}", run_id, step_id, s.name, name);
230 crate::db::insert_artifact_registry(
231 &mut *tx,
232 run_id,
233 step_id,
234 name,
235 bid,
236 remote_path,
237 meta.clone(),
238 )
239 .await?;
240 }
241 }
242 }
243 }
244
245 persist_step_test_reports(&payload, &mut tx, run_id, step_id, &pool).await?;
247
248 let context = fetch_run_context(run_id, &mut *tx).await?;
250 let workflow: Workflow = serde_json::from_value(context.workflow_definition)
251 .context("Failed to parse workflow definition from DB")?;
252
253 let all_steps_initial: Vec<StepInstance> =
254 crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
255
256 let current_step_instance = all_steps_initial
257 .iter()
258 .find(|s| s.id == step_id)
259 .context("Completed step not found in DB")?;
260
261 let dsl_step = workflow
262 .steps
263 .iter()
264 .find(|s| s.name == current_step_instance.step_name);
265
266 if let (Some(dsl_step), Some(backend)) = (dsl_step, &*log_backend) {
267 if !dsl_step.outputs.is_empty() {
268 tracing::info!("Scraping outputs from logs for step {}", dsl_step.name);
269 let logs = backend
270 .fetch_step_logs(
271 &dsl_step.name,
272 step_id,
273 current_step_instance.started_at,
274 current_step_instance.finished_at,
275 )
276 .await
277 .unwrap_or_default();
278
279 let mut filtered_logs = Vec::new();
280 let mut in_output_block = dsl_step.start_marker.is_none();
281
282 for line in &logs {
283 if let Some(start) = &dsl_step.start_marker {
284 if line.contains(start) {
285 in_output_block = true;
286 continue;
287 }
288 }
289 if let Some(end) = &dsl_step.end_marker {
290 if line.contains(end) {
291 in_output_block = false;
292 continue;
293 }
294 }
295 if in_output_block {
296 filtered_logs.push(line);
297 }
298 }
299
300 for extraction in &dsl_step.outputs {
301 if extraction.source == "logs" || extraction.source == "stdout" {
302 if let Some(regex_str) = &extraction.regex {
303 if let Ok(re) = regex::Regex::new(regex_str) {
304 for line in filtered_logs.iter().rev() {
305 if let Some(caps) = re.captures(line) {
306 let value = if let Some(marker) = &extraction.marker {
307 if line.contains(marker) {
308 caps.get(extraction.group.unwrap_or(1) as usize)
309 .map(|m| m.as_str().to_string())
310 } else {
311 None
312 }
313 } else {
314 caps.get(extraction.group.unwrap_or(1) as usize)
315 .map(|m| m.as_str().to_string())
316 };
317
318 if let Some(val) = value {
319 crate::db::upsert_step_output_with_sensitivity(
320 &mut *tx,
321 step_id,
322 &extraction.name,
323 &serde_json::json!(val),
324 extraction.sensitive.unwrap_or(false),
325 )
326 .await?;
327 break;
328 }
329 }
330 }
331 }
332 }
333 }
334 }
335 }
336 }
337
338 if let Some(dsl_step) = dsl_step {
339 let all_instances_of_this_step: Vec<&StepInstance> = all_steps
341 .iter()
342 .filter(|s| s.step_name == dsl_step.name)
343 .collect();
344
345 let finished_instances = all_instances_of_this_step
346 .iter()
347 .filter(|s| s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped)
348 .count();
349
350 let total_instances = all_instances_of_this_step.len();
351
352 if finished_instances < total_instances {
353 let waiting_instances: Vec<&&StepInstance> = all_instances_of_this_step
355 .iter()
356 .filter(|s| s.status == StepStatus::WaitingForEvent)
357 .collect();
358
359 if !waiting_instances.is_empty() {
360 let running_or_pending = all_instances_of_this_step
361 .iter()
362 .filter(|s| s.status == StepStatus::Running || s.status == StepStatus::Pending)
363 .count();
364
365 let max_parallel = dsl_step
366 .strategy
367 .as_ref()
368 .and_then(|s| s.max_parallel)
369 .unwrap_or(u32::MAX);
370
371 if (running_or_pending as u32) < max_parallel {
372 let to_schedule = max_parallel - (running_or_pending as u32);
373 for next_instance in waiting_instances.iter().take(to_schedule as usize) {
374 let machine =
375 crate::step_machine::StepMachine::<
376 crate::step_machine::state::WaitingForEvent,
377 >::from_instance((**next_instance).clone());
378 let _ = machine.reschedule(&mut *tx).await?;
379
380 let inst_data: (Value, Value) =
381 crate::db::get_step_spec_and_params(&mut *tx, next_instance.id).await?;
382
383 dispatch_step_instance(
384 run_id,
385 next_instance.id,
386 &dsl_step.name,
387 &dsl_step.r#type,
388 &inst_data.0,
389 &inst_data.1,
390 nats_client.clone(),
391 pool.clone(),
392 tls_reloader.clone(),
393 )
394 .await?;
395 }
396 }
397 }
398 tx.commit().await?;
399 return Ok(());
400 }
401
402 if !dsl_step.next.is_empty() {
404 let hcl_ctx = crate::hcl_eval::create_context(
405 context.inputs.clone(),
406 run_id,
407 fetch_outputs(run_id, &mut *tx).await?,
408 );
409
410 for next_step_name in &dsl_step.next {
411 let predecessors: Vec<&ast::Step> = workflow
412 .steps
413 .iter()
414 .filter(|s| s.next.contains(next_step_name))
415 .collect();
416
417 let all_predecessors_done = predecessors.iter().all(|pred_dsl| {
418 let pred_instances: Vec<&StepInstance> = all_steps
419 .iter()
420 .filter(|s| s.step_name == pred_dsl.name)
421 .collect();
422
423 !pred_instances.is_empty()
424 && pred_instances.iter().all(|s| {
425 s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped
426 })
427 });
428
429 if all_predecessors_done {
430 if let Some(next_dsl) =
431 workflow.steps.iter().find(|s| s.name == *next_step_name)
432 {
433 #[allow(clippy::explicit_auto_deref)]
434 schedule_step(
435 run_id,
436 next_dsl,
437 &mut *tx,
438 nats_client.clone(),
439 &hcl_ctx,
440 pool.clone(),
441 &workflow,
442 )
443 .await?;
444 }
445 }
446 }
447 }
448 }
449
450 let all_steps_final: Vec<StepInstance> =
452 crate::db::get_step_instances_by_run_id(&mut *tx, run_id).await?;
453
454 let all_dsl_steps_done = workflow.steps.iter().all(|dsl_step| {
455 let step_instances: Vec<&StepInstance> = all_steps_final
456 .iter()
457 .filter(|s| s.step_name == dsl_step.name)
458 .collect();
459
460 !step_instances.is_empty()
461 && step_instances
462 .iter()
463 .all(|s| s.status == StepStatus::Succeeded || s.status == StepStatus::Skipped)
464 });
465
466 if all_dsl_steps_done {
467 let run = fetch_run(run_id, &mut *tx).await?;
468 let machine = WorkflowMachine::<state::Running>::new_from_run(run.clone());
469 let _ = machine.succeed(&mut *tx).await?;
470
471 crate::RUNS_COMPLETED.add(
472 1,
473 &[
474 opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
475 opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
476 ],
477 );
478
479 tx.commit().await?;
480 info!(
481 "Workflow run {} completed successfully, archiving...",
482 run_id
483 );
484 archive_workflow(run_id, pool.clone()).await?;
485 return Ok(());
486 } else {
487 tx.commit().await?;
488 }
489
490 if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
491 error!(
492 "Failed to dispatch pending steps for run {}: {:?}",
493 run_id, e
494 );
495 }
496
497 Ok(())
498}
499
500#[tracing::instrument(skip(payload, pool, nats_client, tls_reloader), fields(run_id = tracing::field::Empty, step_id = tracing::field::Empty))]
501pub async fn handle_step_failed(
503 payload: Value,
504 pool: PgPool,
505 nats_client: async_nats::Client,
506 tls_reloader: Arc<TlsReloader>,
507) -> Result<()> {
508 let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
509 let run_id = Uuid::parse_str(run_id_str)?;
510 let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
511 let step_id = Uuid::parse_str(step_id_str)?;
512
513 let span = tracing::Span::current();
514 span.record("run_id", tracing::field::display(run_id));
515 span.record("step_id", tracing::field::display(step_id));
516 let error_msg = payload["error"].as_str().unwrap_or("Unknown error");
517 let exit_code = payload["exit_code"].as_i64().map(|c| c as i32);
518
519 info!("Step {} (Run {}) failed: {}", step_id, run_id, error_msg);
520
521 let mut tx = pool.begin().await?;
522
523 if crate::db::lock_workflow_run(&mut *tx, run_id)
524 .await?
525 .is_none()
526 {
527 return Ok(());
528 }
529
530 let instance = fetch_step_instance(step_id, &mut *tx).await?;
531
532 if instance.status == StepStatus::Succeeded || instance.status == StepStatus::Failed {
534 return Ok(());
535 }
536
537 let _ = release_step_quota_for_instance(&mut *tx, run_id, step_id).await;
538
539 let machine =
540 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
541 instance.clone(),
542 );
543 let _ = machine
544 .fail(error_msg.to_string(), exit_code, &mut *tx)
545 .await?;
546
547 let attributes = [
548 opentelemetry::KeyValue::new("step_name", instance.step_name),
549 opentelemetry::KeyValue::new("step_type", instance.step_type),
550 opentelemetry::KeyValue::new("runner_id", instance.runner_id.unwrap_or_default()),
551 opentelemetry::KeyValue::new("error", error_msg.to_string()),
552 ];
553
554 crate::STEPS_FAILED.add(1, &attributes);
555
556 if let Some(started_at) = instance.started_at {
557 let duration = (Utc::now() - started_at).to_std().unwrap_or_default();
558 crate::STEP_DURATION.record(duration.as_secs_f64(), &attributes);
559 }
560
561 if let Some(outputs) = payload["outputs"].as_object() {
562 for (key, value) in outputs {
563 crate::db::upsert_step_output(&mut *tx, step_id, key, value).await?;
564 }
565 }
566
567 persist_step_test_reports(&payload, &mut tx, run_id, step_id, &pool).await?;
569
570 let run = fetch_run(run_id, &mut *tx).await?;
571 let run_machine = WorkflowMachine::<state::Running>::new_from_run(run.clone());
572 let _ = run_machine
573 .fail(format!("Step {} failed: {}", step_id, error_msg), &mut *tx)
574 .await?;
575
576 crate::RUNS_FAILED.add(
577 1,
578 &[
579 opentelemetry::KeyValue::new("workflow_name", run.workflow_name),
580 opentelemetry::KeyValue::new("initiating_user", run.initiating_user),
581 opentelemetry::KeyValue::new("error", format!("Step {} failed", step_id)),
582 ],
583 );
584
585 tx.commit().await?;
586 info!("Workflow run {} failed, archiving...", run_id);
587 archive_workflow(run_id, pool.clone()).await?;
588
589 if let Err(e) = dispatch_pending_steps(run_id, pool, nats_client, tls_reloader).await {
590 error!(
591 "Failed to dispatch pending steps after failure for run {}: {:?}",
592 run_id, e
593 );
594 }
595
596 Ok(())
597}
598
599async fn persist_step_test_reports(
600 payload: &Value,
601 tx: &mut Transaction<'_, Postgres>,
602 run_id: Uuid,
603 step_id: Uuid,
604 pool: &PgPool,
605) -> Result<()> {
606 if let Some(reports) = payload["test_reports"].as_object() {
607 for (_key, report_val) in reports {
608 let name = report_val
609 .get("name")
610 .and_then(|v| v.as_str())
611 .unwrap_or("unknown");
612 let file_name = report_val
613 .get("file_name")
614 .and_then(|v| v.as_str())
615 .unwrap_or("unknown");
616 let format = report_val
617 .get("format")
618 .and_then(|v| v.as_str())
619 .unwrap_or("unknown");
620 let hash = report_val
621 .get("hash")
622 .and_then(|v| v.as_str())
623 .unwrap_or("");
624
625 if report_val
626 .get("is_claim")
627 .and_then(|v| v.as_bool())
628 .unwrap_or(false)
629 {
630 let remote_path = report_val.get("remote_path").and_then(|v| v.as_str());
631 let backend_id = report_val.get("backend_id").and_then(|v| {
632 if let Some(s) = v.as_str() {
633 Uuid::parse_str(s).ok()
634 } else {
635 None
636 }
637 });
638
639 if let (Some(path), Some(bid)) = (remote_path, backend_id) {
640 let backend: StorageBackend =
642 crate::db::storage::get_storage_backend_by_id(pool, bid)
643 .await?
644 .ok_or_else(|| anyhow::anyhow!("Storage backend not found"))?;
645
646 let client = crate::s3::get_s3_client(&backend).await?;
647 let bucket = backend.config["bucket"]
648 .as_str()
649 .context("Missing bucket")?;
650 let response = client.get_object().bucket(bucket).key(path).send().await?;
651 let data = response.body.collect().await?.to_vec();
652
653 let mut summaries = Vec::new();
655 let mut test_cases = Vec::new();
656 let mut raw_contents = Vec::new();
657
658 {
659 let tar_gz = GzDecoder::new(&data[..]);
661 let mut archive = Archive::new(tar_gz);
662 for entry in archive.entries()? {
663 let mut entry = entry?;
664 let mut content = String::new();
665 entry.read_to_string(&mut content)?;
666
667 if format == "junit" {
668 if let Ok((summary, cases)) =
669 crate::junit::parse_junit(&content, name, run_id, step_id)
670 {
671 summaries.push(summary);
672 test_cases.extend(cases);
673 }
674 }
675 raw_contents.push(content);
676 }
677 }
678
679 for case in test_cases {
680 crate::db::insert_step_test_case(&mut **tx, run_id, step_id, name, &case)
681 .await?;
682 }
683
684 if let Some(final_summary) = crate::junit::aggregate_summaries(&summaries) {
685 crate::db::insert_step_test_summary(
686 &mut **tx,
687 run_id,
688 step_id,
689 name,
690 &final_summary,
691 )
692 .await?;
693 }
694
695 let combined_raw = raw_contents.join("\n---\n");
696
697 crate::db::insert_step_test_report(
698 &mut **tx,
699 run_id,
700 step_id,
701 name,
702 file_name,
703 format,
704 Some(&combined_raw),
705 hash,
706 Some(bid),
707 Some(path),
708 )
709 .await?;
710 }
711 } else if let Some(content) = report_val.get("content").and_then(|v| v.as_str()) {
712 if format == "junit" {
714 if let Ok((summary, cases)) =
715 crate::junit::parse_junit(content, name, run_id, step_id)
716 {
717 crate::db::insert_step_test_summary(
718 &mut **tx, run_id, step_id, name, &summary,
719 )
720 .await?;
721 for case in cases {
722 crate::db::insert_step_test_case(
723 &mut **tx, run_id, step_id, name, &case,
724 )
725 .await?;
726 }
727 }
728 }
729
730 crate::db::insert_step_test_report(
731 &mut **tx,
732 run_id,
733 step_id,
734 name,
735 file_name,
736 format,
737 Some(content),
738 hash,
739 None,
740 None,
741 )
742 .await?;
743 }
744 }
745 }
746 Ok(())
747}
748
749pub async fn handle_step_query(
751 payload: Value,
752 pool: PgPool,
753 nats_client: async_nats::Client,
754 reply: Option<String>,
755) -> Result<()> {
756 let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
757 let step_id = Uuid::parse_str(step_id_str)?;
758
759 let step: Option<StepInstance> = crate::db::get_step_instance_by_id(&pool, step_id)
760 .await
761 .map(|v: Option<StepInstance>| v)?;
762
763 if let Some(reply_subject) = reply {
764 let response = if let Some(s) = step {
765 serde_json::json!({
766 "step_id": step_id,
767 "status": s.status,
768 "exists": true
769 })
770 } else {
771 serde_json::json!({
772 "step_id": step_id,
773 "exists": false
774 })
775 };
776 nats_client
777 .publish(reply_subject, response.to_string().into())
778 .await?;
779 }
780
781 Ok(())
782}