Skip to main content

stormchaser_engine/handler/step/events/
query.rs

1use anyhow::{Context, Result};
2use serde_json::Value;
3use sqlx::PgPool;
4use stormchaser_model::events::{EventSource, SchemaVersion};
5use stormchaser_model::events::{EventType, StepEventType, StepQueryResponseEvent};
6use stormchaser_model::nats::publish_cloudevent;
7use stormchaser_model::StepInstance;
8use stormchaser_model::StepInstanceId;
9
10/// Handles incoming queries for step status or output data over NATS.
11pub async fn handle_step_query(
12    payload: Value,
13    pool: PgPool,
14    nats_client: async_nats::Client,
15    reply: Option<String>,
16) -> Result<()> {
17    let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
18    let step_id = uuid::Uuid::parse_str(step_id_str).map(StepInstanceId::new)?;
19
20    let step: Option<StepInstance> = crate::db::get_step_instance_by_id(&pool, step_id)
21        .await
22        .map(|v: Option<StepInstance>| v)?;
23
24    if let Some(reply_subject) = reply {
25        let response = if let Some(s) = step {
26            StepQueryResponseEvent {
27                step_id,
28                status: Some(s.status),
29                exists: true,
30            }
31        } else {
32            StepQueryResponseEvent {
33                step_id,
34                status: None,
35                exists: false,
36            }
37        };
38        use stormchaser_model::nats::NatsSubject;
39        publish_cloudevent(
40            &async_nats::jetstream::new(nats_client.clone()),
41            NatsSubject::Custom(reply_subject.clone()),
42            EventType::Step(StepEventType::QueryResponse),
43            EventSource::System,
44            serde_json::to_value(response).unwrap(),
45            Some(SchemaVersion::new("1.0".to_string())),
46            None,
47        )
48        .await?;
49    }
50
51    Ok(())
52}