stormchaser_engine/handler/integrations/
webhook.rs1use crate::handler::{fetch_outputs, fetch_run_context, fetch_step_instance};
2use anyhow::Result;
3use chrono::Utc;
4use serde_json::Value;
5use sqlx::PgPool;
6use std::time::Duration;
7use stormchaser_model::dsl::WebhookInvokeSpec;
8use stormchaser_model::RunId;
9use stormchaser_model::StepInstanceId;
10use tracing::info;
11
12pub async fn handle_webhook_invoke(
14 run_id: RunId,
15 step_id: StepInstanceId,
16 spec: Value,
17 pool: PgPool,
18 nats_client: async_nats::Client,
19) -> Result<()> {
20 let spec: WebhookInvokeSpec = serde_json::from_value(spec)?;
21
22 info!("Invoking webhook {} for run {}", spec.url, run_id);
23
24 let instance = fetch_step_instance(step_id, &pool).await?;
26 let machine =
27 crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
28 instance,
29 );
30 let _ = machine
31 .start("webhook".to_string(), &mut *pool.acquire().await?)
32 .await?;
33
34 let run_context = fetch_run_context(run_id, &pool).await?;
36 let outputs = fetch_outputs(run_id, &pool).await?;
37
38 let template_ctx = serde_json::json!({
39 "inputs": run_context.inputs,
40 "steps": outputs,
41 "run": {
42 "id": run_id.to_string(),
43 }
44 });
45
46 let rendered_body = render_webhook_body(&spec, &template_ctx)?;
48
49 execute_webhook_request(run_id, step_id, &spec, rendered_body, pool, nats_client).await
51}
52
53use stormchaser_model::dsl;
54
55fn render_webhook_body(
56 spec: &dsl::WebhookInvokeSpec,
57 template_ctx: &Value,
58) -> Result<Option<String>> {
59 use minijinja::Environment;
60 if let Some(body_tmpl) = &spec.body {
61 let env = Environment::new();
62 Ok(Some(env.render_str(body_tmpl, template_ctx).map_err(
63 |e| anyhow::anyhow!("Failed to render webhook body: {:?}", e),
64 )?))
65 } else {
66 Ok(None)
67 }
68}
69
70async fn execute_webhook_request(
71 run_id: RunId,
72 step_id: StepInstanceId,
73 spec: &dsl::WebhookInvokeSpec,
74 rendered_body: Option<String>,
75 pool: PgPool,
76 nats_client: async_nats::Client,
77) -> Result<()> {
78 let client = reqwest::Client::new();
79 let method = match spec
80 .method
81 .as_deref()
82 .unwrap_or("POST")
83 .to_uppercase()
84 .as_str()
85 {
86 "GET" => reqwest::Method::GET,
87 "POST" => reqwest::Method::POST,
88 "PUT" => reqwest::Method::PUT,
89 "DELETE" => reqwest::Method::DELETE,
90 "PATCH" => reqwest::Method::PATCH,
91 _ => reqwest::Method::POST,
92 };
93
94 let mut builder = client.request(method, &spec.url);
95
96 if let Some(headers) = &spec.headers {
97 for (k, v) in headers {
98 builder = builder.header(k, v);
99 }
100 }
101
102 if let Some(body) = rendered_body {
103 builder = builder.body(body);
104 }
105
106 let timeout = spec
107 .timeout
108 .as_ref()
109 .and_then(|t| humantime::parse_duration(t).ok())
110 .unwrap_or(Duration::from_secs(30));
111
112 let res = builder.timeout(timeout).send().await?;
113 let status = res.status();
114
115 if status.is_success() {
116 let body_bytes = res.bytes().await?;
117 let body_val: Value = serde_json::from_slice(&body_bytes).unwrap_or_else(
118 |_| serde_json::json!({ "text": String::from_utf8_lossy(&body_bytes) }),
119 );
120
121 let instance = fetch_step_instance(step_id, &pool).await?;
122 let machine =
123 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
124 instance,
125 );
126 let _ = machine.succeed(&mut *pool.acquire().await?).await?;
127
128 let event = serde_json::json!({
129 "run_id": run_id,
130 "step_id": step_id,
131 "event_type": "step_completed",
132 "outputs": body_val,
133 "timestamp": Utc::now(),
134 });
135 let js = async_nats::jetstream::new(nats_client);
136 js.publish("stormchaser.step.completed", event.to_string().into())
137 .await?;
138 Ok(())
139 } else {
140 let error_body = res
141 .text()
142 .await
143 .unwrap_or_else(|_| "Unknown error".to_string());
144 anyhow::bail!("Webhook failed with status {}: {}", status, error_body);
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use serde_json::json;
152
153 #[test]
154 fn test_render_webhook_body_happy_path() {
155 let spec = WebhookInvokeSpec {
156 url: "http://example.com".to_string(),
157 method: Some("POST".to_string()),
158 headers: None,
159 body: Some("Hello {{ inputs.name }}!".to_string()),
160 timeout: None,
161 };
162 let ctx = json!({
163 "inputs": {
164 "name": "World"
165 }
166 });
167 let rendered = render_webhook_body(&spec, &ctx).unwrap();
168 assert_eq!(rendered, Some("Hello World!".to_string()));
169 }
170
171 #[test]
172 fn test_render_webhook_body_no_body() {
173 let spec = WebhookInvokeSpec {
174 url: "http://example.com".to_string(),
175 method: Some("POST".to_string()),
176 headers: None,
177 body: None,
178 timeout: None,
179 };
180 let ctx = json!({});
181 let rendered = render_webhook_body(&spec, &ctx).unwrap();
182 assert_eq!(rendered, None);
183 }
184
185 #[test]
186 fn test_render_webhook_body_error() {
187 let spec_invalid = WebhookInvokeSpec {
188 url: "http://example.com".to_string(),
189 method: None,
190 headers: None,
191 body: Some("Hello {{ inputs.name".to_string()), timeout: None,
193 };
194 let ctx = json!({"inputs": {"name": "World"}});
195 let result = render_webhook_body(&spec_invalid, &ctx);
196 assert!(result.is_err());
197 }
198
199 #[test]
200 fn test_render_webhook_body_with_complex_ctx() {
201 let spec = WebhookInvokeSpec {
202 url: "http://example.com".to_string(),
203 method: None,
204 headers: None,
205 body: Some("Run ID: {{ run.id }}, Step result: {{ steps.test.status }}".to_string()),
206 timeout: None,
207 };
208 let ctx = json!({
209 "run": {
210 "id": "123"
211 },
212 "steps": {
213 "test": {
214 "status": "success"
215 }
216 }
217 });
218 let rendered = render_webhook_body(&spec, &ctx).unwrap();
219 assert_eq!(
220 rendered,
221 Some("Run ID: 123, Step result: success".to_string())
222 );
223 }
224}