stormchaser_engine/handler/step/events/
query.rs1use anyhow::{Context, Result};
2use serde_json::Value;
3use sqlx::PgPool;
4use stormchaser_model::events::{EventSource, SchemaVersion};
5use stormchaser_model::events::{EventType, StepEventType, StepQueryResponseEvent};
6use stormchaser_model::nats::publish_cloudevent;
7use stormchaser_model::StepInstance;
8use stormchaser_model::StepInstanceId;
9
10pub async fn handle_step_query(
12 payload: Value,
13 pool: PgPool,
14 nats_client: async_nats::Client,
15 reply: Option<String>,
16) -> Result<()> {
17 let step_id_str = payload["step_id"].as_str().context("Missing step_id")?;
18 let step_id = uuid::Uuid::parse_str(step_id_str).map(StepInstanceId::new)?;
19
20 let step: Option<StepInstance> = crate::db::get_step_instance_by_id(&pool, step_id)
21 .await
22 .map(|v: Option<StepInstance>| v)?;
23
24 if let Some(reply_subject) = reply {
25 let response = if let Some(s) = step {
26 StepQueryResponseEvent {
27 step_id,
28 status: Some(s.status),
29 exists: true,
30 }
31 } else {
32 StepQueryResponseEvent {
33 step_id,
34 status: None,
35 exists: false,
36 }
37 };
38 use stormchaser_model::nats::NatsSubject;
39 publish_cloudevent(
40 &async_nats::jetstream::new(nats_client.clone()),
41 NatsSubject::Custom(reply_subject.clone()),
42 EventType::Step(StepEventType::QueryResponse),
43 EventSource::System,
44 serde_json::to_value(response).unwrap(),
45 Some(SchemaVersion::new("1.0".to_string())),
46 None,
47 )
48 .await?;
49 }
50
51 Ok(())
52}