stormchaser_engine/handler/integrations/
webhook.rs1use crate::handler::{fetch_outputs, fetch_run_context, fetch_step_instance};
2use anyhow::Result;
3use chrono::Utc;
4use serde_json::Value;
5use sqlx::PgPool;
6use std::time::Duration;
7use tracing::info;
8use uuid::Uuid;
9
10pub async fn handle_webhook_invoke(
12 run_id: Uuid,
13 step_id: Uuid,
14 spec: Value,
15 pool: PgPool,
16 nats_client: async_nats::Client,
17) -> Result<()> {
18 use stormchaser_model::dsl::WebhookInvokeSpec;
19
20 let spec: WebhookInvokeSpec = serde_json::from_value(spec)?;
21
22 info!("Invoking webhook {} for run {}", spec.url, run_id);
23
24 let instance = fetch_step_instance(step_id, &pool).await?;
26 let machine =
27 crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
28 instance,
29 );
30 let _ = machine
31 .start("webhook".to_string(), &mut *pool.acquire().await?)
32 .await?;
33
34 let run_context = fetch_run_context(run_id, &pool).await?;
36 let outputs = fetch_outputs(run_id, &pool).await?;
37
38 let template_ctx = serde_json::json!({
39 "inputs": run_context.inputs,
40 "steps": outputs,
41 "run": {
42 "id": run_id.to_string(),
43 }
44 });
45
46 let rendered_body = render_webhook_body(&spec, &template_ctx)?;
48
49 execute_webhook_request(run_id, step_id, &spec, rendered_body, pool, nats_client).await
51}
52
53use stormchaser_model::dsl;
54
55fn render_webhook_body(
56 spec: &dsl::WebhookInvokeSpec,
57 template_ctx: &Value,
58) -> Result<Option<String>> {
59 use minijinja::Environment;
60 if let Some(body_tmpl) = &spec.body {
61 let env = Environment::new();
62 Ok(Some(env.render_str(body_tmpl, template_ctx).map_err(
63 |e| anyhow::anyhow!("Failed to render webhook body: {:?}", e),
64 )?))
65 } else {
66 Ok(None)
67 }
68}
69
70async fn execute_webhook_request(
71 run_id: Uuid,
72 step_id: Uuid,
73 spec: &dsl::WebhookInvokeSpec,
74 rendered_body: Option<String>,
75 pool: PgPool,
76 nats_client: async_nats::Client,
77) -> Result<()> {
78 let client = reqwest::Client::new();
79 let method = match spec
80 .method
81 .as_deref()
82 .unwrap_or("POST")
83 .to_uppercase()
84 .as_str()
85 {
86 "GET" => reqwest::Method::GET,
87 "POST" => reqwest::Method::POST,
88 "PUT" => reqwest::Method::PUT,
89 "DELETE" => reqwest::Method::DELETE,
90 "PATCH" => reqwest::Method::PATCH,
91 _ => reqwest::Method::POST,
92 };
93
94 let mut builder = client.request(method, &spec.url);
95
96 if let Some(headers) = &spec.headers {
97 for (k, v) in headers {
98 builder = builder.header(k, v);
99 }
100 }
101
102 if let Some(body) = rendered_body {
103 builder = builder.body(body);
104 }
105
106 let timeout = spec
107 .timeout
108 .as_ref()
109 .and_then(|t| humantime::parse_duration(t).ok())
110 .unwrap_or(Duration::from_secs(30));
111
112 let res = builder.timeout(timeout).send().await?;
113 let status = res.status();
114
115 if status.is_success() {
116 let body_bytes = res.bytes().await?;
117 let body_val: Value = serde_json::from_slice(&body_bytes).unwrap_or_else(
118 |_| serde_json::json!({ "text": String::from_utf8_lossy(&body_bytes) }),
119 );
120
121 let instance = fetch_step_instance(step_id, &pool).await?;
122 let machine =
123 crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
124 instance,
125 );
126 let _ = machine.succeed(&mut *pool.acquire().await?).await?;
127
128 let event = serde_json::json!({
129 "run_id": run_id,
130 "step_id": step_id,
131 "event_type": "step_completed",
132 "outputs": body_val,
133 "timestamp": Utc::now(),
134 });
135 let js = async_nats::jetstream::new(nats_client);
136 js.publish("stormchaser.step.completed", event.to_string().into())
137 .await?;
138 Ok(())
139 } else {
140 let error_body = res
141 .text()
142 .await
143 .unwrap_or_else(|_| "Unknown error".to_string());
144 anyhow::bail!("Webhook failed with status {}: {}", status, error_body);
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use serde_json::json;
152 use stormchaser_model::dsl::WebhookInvokeSpec;
153
154 #[test]
155 fn test_render_webhook_body_happy_path() {
156 let spec = WebhookInvokeSpec {
157 url: "http://example.com".to_string(),
158 method: Some("POST".to_string()),
159 headers: None,
160 body: Some("Hello {{ inputs.name }}!".to_string()),
161 timeout: None,
162 };
163 let ctx = json!({
164 "inputs": {
165 "name": "World"
166 }
167 });
168 let rendered = render_webhook_body(&spec, &ctx).unwrap();
169 assert_eq!(rendered, Some("Hello World!".to_string()));
170 }
171
172 #[test]
173 fn test_render_webhook_body_no_body() {
174 let spec = WebhookInvokeSpec {
175 url: "http://example.com".to_string(),
176 method: Some("POST".to_string()),
177 headers: None,
178 body: None,
179 timeout: None,
180 };
181 let ctx = json!({});
182 let rendered = render_webhook_body(&spec, &ctx).unwrap();
183 assert_eq!(rendered, None);
184 }
185
186 #[test]
187 fn test_render_webhook_body_error() {
188 let spec_invalid = WebhookInvokeSpec {
189 url: "http://example.com".to_string(),
190 method: None,
191 headers: None,
192 body: Some("Hello {{ inputs.name".to_string()), timeout: None,
194 };
195 let ctx = json!({"inputs": {"name": "World"}});
196 let result = render_webhook_body(&spec_invalid, &ctx);
197 assert!(result.is_err());
198 }
199
200 #[test]
201 fn test_render_webhook_body_with_complex_ctx() {
202 let spec = WebhookInvokeSpec {
203 url: "http://example.com".to_string(),
204 method: None,
205 headers: None,
206 body: Some("Run ID: {{ run.id }}, Step result: {{ steps.test.status }}".to_string()),
207 timeout: None,
208 };
209 let ctx = json!({
210 "run": {
211 "id": "123"
212 },
213 "steps": {
214 "test": {
215 "status": "success"
216 }
217 }
218 });
219 let rendered = render_webhook_body(&spec, &ctx).unwrap();
220 assert_eq!(
221 rendered,
222 Some("Run ID: 123, Step result: success".to_string())
223 );
224 }
225}