Skip to main content

stormchaser_engine/handler/integrations/
webhook.rs

1use crate::handler::{fetch_outputs, fetch_run_context, fetch_step_instance};
2use anyhow::Result;
3use chrono::Utc;
4use serde_json::Value;
5use sqlx::PgPool;
6use std::time::Duration;
7use stormchaser_model::dsl::WebhookInvokeSpec;
8use stormchaser_model::RunId;
9use stormchaser_model::StepInstanceId;
10use tracing::info;
11
12/// Handle webhook invoke.
13pub async fn handle_webhook_invoke(
14    run_id: RunId,
15    step_id: StepInstanceId,
16    spec: Value,
17    pool: PgPool,
18    nats_client: async_nats::Client,
19) -> Result<()> {
20    let spec: WebhookInvokeSpec = serde_json::from_value(spec)?;
21
22    info!("Invoking webhook {} for run {}", spec.url, run_id);
23
24    // 1. Mark as Running
25    let instance = fetch_step_instance(step_id, &pool).await?;
26    let machine =
27        crate::step_machine::StepMachine::<crate::step_machine::state::Pending>::from_instance(
28            instance,
29        );
30    let _ = machine
31        .start("webhook".to_string(), &mut *pool.acquire().await?)
32        .await?;
33
34    // 2. Prepare Context for Template Rendering
35    let run_context = fetch_run_context(run_id, &pool).await?;
36    let outputs = fetch_outputs(run_id, &pool).await?;
37
38    let template_ctx = serde_json::json!({
39        "inputs": run_context.inputs,
40        "steps": outputs,
41        "run": {
42            "id": run_id.to_string(),
43        }
44    });
45
46    // 3. Render Body if present
47    let rendered_body = render_webhook_body(&spec, &template_ctx)?;
48
49    // 4. Build and Execute Request
50    execute_webhook_request(run_id, step_id, &spec, rendered_body, pool, nats_client).await
51}
52
53use stormchaser_model::dsl;
54
55fn render_webhook_body(
56    spec: &dsl::WebhookInvokeSpec,
57    template_ctx: &Value,
58) -> Result<Option<String>> {
59    use minijinja::Environment;
60    if let Some(body_tmpl) = &spec.body {
61        let env = Environment::new();
62        Ok(Some(env.render_str(body_tmpl, template_ctx).map_err(
63            |e| anyhow::anyhow!("Failed to render webhook body: {:?}", e),
64        )?))
65    } else {
66        Ok(None)
67    }
68}
69
70async fn execute_webhook_request(
71    run_id: RunId,
72    step_id: StepInstanceId,
73    spec: &dsl::WebhookInvokeSpec,
74    rendered_body: Option<String>,
75    pool: PgPool,
76    nats_client: async_nats::Client,
77) -> Result<()> {
78    let client = reqwest::Client::new();
79    let method = match spec
80        .method
81        .as_deref()
82        .unwrap_or("POST")
83        .to_uppercase()
84        .as_str()
85    {
86        "GET" => reqwest::Method::GET,
87        "POST" => reqwest::Method::POST,
88        "PUT" => reqwest::Method::PUT,
89        "DELETE" => reqwest::Method::DELETE,
90        "PATCH" => reqwest::Method::PATCH,
91        _ => reqwest::Method::POST,
92    };
93
94    let mut builder = client.request(method, &spec.url);
95
96    if let Some(headers) = &spec.headers {
97        for (k, v) in headers {
98            builder = builder.header(k, v);
99        }
100    }
101
102    if let Some(body) = rendered_body {
103        builder = builder.body(body);
104    }
105
106    let timeout = spec
107        .timeout
108        .as_ref()
109        .and_then(|t| humantime::parse_duration(t).ok())
110        .unwrap_or(Duration::from_secs(30));
111
112    let res = builder.timeout(timeout).send().await?;
113    let status = res.status();
114
115    if status.is_success() {
116        let body_bytes = res.bytes().await?;
117        let body_val: Value = serde_json::from_slice(&body_bytes).unwrap_or_else(
118            |_| serde_json::json!({ "text": String::from_utf8_lossy(&body_bytes) }),
119        );
120
121        let instance = fetch_step_instance(step_id, &pool).await?;
122        let machine =
123            crate::step_machine::StepMachine::<crate::step_machine::state::Running>::from_instance(
124                instance,
125            );
126        let _ = machine.succeed(&mut *pool.acquire().await?).await?;
127
128        let event = serde_json::json!({
129            "run_id": run_id,
130            "step_id": step_id,
131            "event_type": "step_completed",
132            "outputs": body_val,
133            "timestamp": Utc::now(),
134        });
135        let js = async_nats::jetstream::new(nats_client);
136        js.publish("stormchaser.step.completed", event.to_string().into())
137            .await?;
138        Ok(())
139    } else {
140        let error_body = res
141            .text()
142            .await
143            .unwrap_or_else(|_| "Unknown error".to_string());
144        anyhow::bail!("Webhook failed with status {}: {}", status, error_body);
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use serde_json::json;
152
153    #[test]
154    fn test_render_webhook_body_happy_path() {
155        let spec = WebhookInvokeSpec {
156            url: "http://example.com".to_string(),
157            method: Some("POST".to_string()),
158            headers: None,
159            body: Some("Hello {{ inputs.name }}!".to_string()),
160            timeout: None,
161        };
162        let ctx = json!({
163            "inputs": {
164                "name": "World"
165            }
166        });
167        let rendered = render_webhook_body(&spec, &ctx).unwrap();
168        assert_eq!(rendered, Some("Hello World!".to_string()));
169    }
170
171    #[test]
172    fn test_render_webhook_body_no_body() {
173        let spec = WebhookInvokeSpec {
174            url: "http://example.com".to_string(),
175            method: Some("POST".to_string()),
176            headers: None,
177            body: None,
178            timeout: None,
179        };
180        let ctx = json!({});
181        let rendered = render_webhook_body(&spec, &ctx).unwrap();
182        assert_eq!(rendered, None);
183    }
184
185    #[test]
186    fn test_render_webhook_body_error() {
187        let spec_invalid = WebhookInvokeSpec {
188            url: "http://example.com".to_string(),
189            method: None,
190            headers: None,
191            body: Some("Hello {{ inputs.name".to_string()), // missing closing }}
192            timeout: None,
193        };
194        let ctx = json!({"inputs": {"name": "World"}});
195        let result = render_webhook_body(&spec_invalid, &ctx);
196        assert!(result.is_err());
197    }
198
199    #[test]
200    fn test_render_webhook_body_with_complex_ctx() {
201        let spec = WebhookInvokeSpec {
202            url: "http://example.com".to_string(),
203            method: None,
204            headers: None,
205            body: Some("Run ID: {{ run.id }}, Step result: {{ steps.test.status }}".to_string()),
206            timeout: None,
207        };
208        let ctx = json!({
209            "run": {
210                "id": "123"
211            },
212            "steps": {
213                "test": {
214                    "status": "success"
215                }
216            }
217        });
218        let rendered = render_webhook_body(&spec, &ctx).unwrap();
219        assert_eq!(
220            rendered,
221            Some("Run ID: 123, Step result: success".to_string())
222        );
223    }
224}