stormchaser_engine/handler/step/events/
query.rs1use anyhow::{Context, Result};
2use serde_json::Value;
3use sqlx::PgPool;
4use stormchaser_model::events::StepQueryResponseEvent;
5use stormchaser_model::StepInstance;
6use stormchaser_model::StepInstanceId;
7
8pub async fn handle_step_query(
10 payload: Value,
11 pool: PgPool,
12 nats_client: async_nats::Client,
13 reply: Option<String>,
14) -> Result<()> {
15 let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
16 let step_id = uuid::Uuid::parse_str(step_id_str).map(StepInstanceId::new)?;
17
18 let step: Option<StepInstance> = crate::db::get_step_instance_by_id(&pool, step_id)
19 .await
20 .map(|v: Option<StepInstance>| v)?;
21
22 if let Some(reply_subject) = reply {
23 let response = if let Some(s) = step {
24 let status_str = serde_json::to_value(&s.status)
25 .ok()
26 .and_then(|v| v.as_str().map(str::to_string));
27 StepQueryResponseEvent {
28 step_id,
29 status: status_str,
30 exists: true,
31 }
32 } else {
33 StepQueryResponseEvent {
34 step_id,
35 status: None,
36 exists: false,
37 }
38 };
39 stormchaser_model::nats::publish_cloudevent(
40 &async_nats::jetstream::new(nats_client.clone()),
41 &reply_subject,
42 &reply_subject,
43 "/stormchaser",
44 serde_json::to_value(response).unwrap(),
45 Some("1.0"),
46 None,
47 )
48 .await?;
49 }
50
51 Ok(())
52}