Skip to main content

stormchaser_engine/handler/integrations/
webhook.rs

1use crate::handler::{fetch_outputs, fetch_run_context, fetch_step_instance};
2use anyhow::Result;
3use chrono::Utc;
4use serde_json::Value;
5use sqlx::PgPool;
6use std::time::Duration;
7use tracing::info;
8use uuid::Uuid;
9
10/// Handle webhook invoke.
11pub async fn handle_webhook_invoke(
12    run_id: Uuid,
13    step_id: Uuid,
14    spec: Value,
15    pool: PgPool,
16    nats_client: async_nats::Client,
17) -> Result<()> {
18    use stormchaser_model::dsl::WebhookInvokeSpec;
19
20    let spec: WebhookInvokeSpec = serde_json::from_value(spec)?;
21
22    info!("Invoking webhook {} for run {}", spec.url, run_id);
23
24    // 1. Mark as Running
25    let instance = fetch_step_instance(step_id, &pool).await?;
26    let machine =
27        crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
28            instance,
29        );
30    let _ = machine
31        .start("webhook".to_string(), &mut *pool.acquire().await?)
32        .await?;
33
34    // 2. Prepare Context for Template Rendering
35    let run_context = fetch_run_context(run_id, &pool).await?;
36    let outputs = fetch_outputs(run_id, &pool).await?;
37
38    let template_ctx = serde_json::json!({
39        "inputs": run_context.inputs,
40        "steps": outputs,
41        "run": {
42            "id": run_id.to_string(),
43        }
44    });
45
46    // 3. Render Body if present
47    let rendered_body = render_webhook_body(&spec, &template_ctx)?;
48
49    // 4. Build and Execute Request
50    execute_webhook_request(run_id, step_id, &spec, rendered_body, pool, nats_client).await
51}
52
53use stormchaser_model::dsl;
54
55fn render_webhook_body(
56    spec: &dsl::WebhookInvokeSpec,
57    template_ctx: &Value,
58) -> Result<Option<String>> {
59    use minijinja::Environment;
60    if let Some(body_tmpl) = &spec.body {
61        let env = Environment::new();
62        Ok(Some(env.render_str(body_tmpl, template_ctx).map_err(
63            |e| anyhow::anyhow!("Failed to render webhook body: {:?}", e),
64        )?))
65    } else {
66        Ok(None)
67    }
68}
69
70async fn execute_webhook_request(
71    run_id: Uuid,
72    step_id: Uuid,
73    spec: &dsl::WebhookInvokeSpec,
74    rendered_body: Option<String>,
75    pool: PgPool,
76    nats_client: async_nats::Client,
77) -> Result<()> {
78    let client = reqwest::Client::new();
79    let method = match spec
80        .method
81        .as_deref()
82        .unwrap_or("POST")
83        .to_uppercase()
84        .as_str()
85    {
86        "GET" => reqwest::Method::GET,
87        "POST" => reqwest::Method::POST,
88        "PUT" => reqwest::Method::PUT,
89        "DELETE" => reqwest::Method::DELETE,
90        "PATCH" => reqwest::Method::PATCH,
91        _ => reqwest::Method::POST,
92    };
93
94    let mut builder = client.request(method, &spec.url);
95
96    if let Some(headers) = &spec.headers {
97        for (k, v) in headers {
98            builder = builder.header(k, v);
99        }
100    }
101
102    if let Some(body) = rendered_body {
103        builder = builder.body(body);
104    }
105
106    let timeout = spec
107        .timeout
108        .as_ref()
109        .and_then(|t| humantime::parse_duration(t).ok())
110        .unwrap_or(Duration::from_secs(30));
111
112    let res = builder.timeout(timeout).send().await?;
113    let status = res.status();
114
115    if status.is_success() {
116        let body_bytes = res.bytes().await?;
117        let body_val: Value = serde_json::from_slice(&body_bytes).unwrap_or_else(
118            |_| serde_json::json!({ "text": String::from_utf8_lossy(&body_bytes) }),
119        );
120
121        let instance = fetch_step_instance(step_id, &pool).await?;
122        let machine =
123            crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
124                instance,
125            );
126        let _ = machine.succeed(&mut *pool.acquire().await?).await?;
127
128        let event = serde_json::json!({
129            "run_id": run_id,
130            "step_id": step_id,
131            "event_type": "step_completed",
132            "outputs": body_val,
133            "timestamp": Utc::now(),
134        });
135        let js = async_nats::jetstream::new(nats_client);
136        js.publish("stormchaser.step.completed", event.to_string().into())
137            .await?;
138        Ok(())
139    } else {
140        let error_body = res
141            .text()
142            .await
143            .unwrap_or_else(|_| "Unknown error".to_string());
144        anyhow::bail!("Webhook failed with status {}: {}", status, error_body);
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use serde_json::json;
152    use stormchaser_model::dsl::WebhookInvokeSpec;
153
154    #[test]
155    fn test_render_webhook_body_happy_path() {
156        let spec = WebhookInvokeSpec {
157            url: "http://example.com".to_string(),
158            method: Some("POST".to_string()),
159            headers: None,
160            body: Some("Hello {{ inputs.name }}!".to_string()),
161            timeout: None,
162        };
163        let ctx = json!({
164            "inputs": {
165                "name": "World"
166            }
167        });
168        let rendered = render_webhook_body(&spec, &ctx).unwrap();
169        assert_eq!(rendered, Some("Hello World!".to_string()));
170    }
171
172    #[test]
173    fn test_render_webhook_body_no_body() {
174        let spec = WebhookInvokeSpec {
175            url: "http://example.com".to_string(),
176            method: Some("POST".to_string()),
177            headers: None,
178            body: None,
179            timeout: None,
180        };
181        let ctx = json!({});
182        let rendered = render_webhook_body(&spec, &ctx).unwrap();
183        assert_eq!(rendered, None);
184    }
185
186    #[test]
187    fn test_render_webhook_body_error() {
188        let spec_invalid = WebhookInvokeSpec {
189            url: "http://example.com".to_string(),
190            method: None,
191            headers: None,
192            body: Some("Hello {{ inputs.name".to_string()), // missing closing }}
193            timeout: None,
194        };
195        let ctx = json!({"inputs": {"name": "World"}});
196        let result = render_webhook_body(&spec_invalid, &ctx);
197        assert!(result.is_err());
198    }
199
200    #[test]
201    fn test_render_webhook_body_with_complex_ctx() {
202        let spec = WebhookInvokeSpec {
203            url: "http://example.com".to_string(),
204            method: None,
205            headers: None,
206            body: Some("Run ID: {{ run.id }}, Step result: {{ steps.test.status }}".to_string()),
207            timeout: None,
208        };
209        let ctx = json!({
210            "run": {
211                "id": "123"
212            },
213            "steps": {
214                "test": {
215                    "status": "success"
216                }
217            }
218        });
219        let rendered = render_webhook_body(&spec, &ctx).unwrap();
220        assert_eq!(
221            rendered,
222            Some("Run ID: 123, Step result: success".to_string())
223        );
224    }
225}