Skip to main content

stormchaser_engine/handler/step/events/
query.rs

1use anyhow::{Context, Result};
2use serde_json::Value;
3use sqlx::PgPool;
4use stormchaser_model::events::StepQueryResponseEvent;
5use stormchaser_model::StepInstance;
6use stormchaser_model::StepInstanceId;
7
8/// Handles incoming queries for step status or output data over NATS.
9pub async fn handle_step_query(
10    payload: Value,
11    pool: PgPool,
12    nats_client: async_nats::Client,
13    reply: Option<String>,
14) -> Result<()> {
15    let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
16    let step_id = uuid::Uuid::parse_str(step_id_str).map(StepInstanceId::new)?;
17
18    let step: Option<StepInstance> = crate::db::get_step_instance_by_id(&pool, step_id)
19        .await
20        .map(|v: Option<StepInstance>| v)?;
21
22    if let Some(reply_subject) = reply {
23        let response = if let Some(s) = step {
24            let status_str = serde_json::to_value(&s.status)
25                .ok()
26                .and_then(|v| v.as_str().map(str::to_string));
27            StepQueryResponseEvent {
28                step_id,
29                status: status_str,
30                exists: true,
31            }
32        } else {
33            StepQueryResponseEvent {
34                step_id,
35                status: None,
36                exists: false,
37            }
38        };
39        stormchaser_model::nats::publish_cloudevent(
40            &async_nats::jetstream::new(nats_client.clone()),
41            &reply_subject,
42            &reply_subject,
43            "/stormchaser",
44            serde_json::to_value(response).unwrap(),
45            Some("1.0"),
46            None,
47        )
48        .await?;
49    }
50
51    Ok(())
52}