Skip to main content

worldinterface_http_trigger/
handler.rs

1//! Webhook invocation handler.
2
3use std::sync::RwLock;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use axum::http::HeaderMap;
7use serde_json::Value;
8use worldinterface_core::id::FlowRunId;
9use worldinterface_host::EmbeddedHost;
10
11use crate::error::WebhookError;
12use crate::receipt::create_trigger_receipt;
13use crate::registry::WebhookRegistry;
14use crate::trigger::TriggerInput;
15
16/// Handle an incoming webhook invocation.
17///
18/// Looks up the registered webhook, builds trigger input from the request,
19/// submits the flow with trigger data injected into ContextStore, and
20/// returns the FlowRunId and receipt.
21pub async fn handle_webhook(
22    path: &str,
23    body: &[u8],
24    headers: &HeaderMap,
25    source_addr: Option<String>,
26    host: &EmbeddedHost,
27    webhook_registry: &RwLock<WebhookRegistry>,
28) -> Result<(FlowRunId, Value), WebhookError> {
29    // 1. Look up webhook registration (read lock released after clone)
30    let flow_spec = {
31        let registry = webhook_registry.read().unwrap();
32        let registration = registry
33            .get_by_path(path)
34            .ok_or_else(|| WebhookError::PathNotFound(path.to_string()))?;
35        registration.flow_spec.clone()
36    };
37
38    // 2. Build trigger input
39    let body_value: Value = serde_json::from_slice(body)
40        .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(body).into_owned()));
41
42    let headers_value = headers_to_json(headers);
43
44    let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
45
46    let trigger_input = TriggerInput {
47        body: body_value,
48        headers: headers_value,
49        method: "POST".to_string(),
50        path: path.to_string(),
51        source_addr,
52        received_at: now,
53    };
54
55    // 3. Submit flow with trigger input
56    let trigger_value = serde_json::to_value(&trigger_input)?;
57    let flow_run_id = host.submit_flow_with_trigger_input(flow_spec, trigger_value).await?;
58
59    // 4. Generate and store trigger receipt
60    let receipt = create_trigger_receipt(flow_run_id, &trigger_input);
61    let receipt_value = serde_json::to_value(&receipt)?;
62    host.store_trigger_receipt(flow_run_id, &receipt_value)?;
63
64    tracing::info!(
65        flow_run_id = %flow_run_id,
66        webhook_path = %path,
67        "webhook triggered flow run"
68    );
69
70    Ok((flow_run_id, receipt_value))
71}
72
73/// Convert HTTP headers to a flat JSON object.
74fn headers_to_json(headers: &HeaderMap) -> Value {
75    let mut map = serde_json::Map::new();
76    for (name, value) in headers.iter() {
77        if let Ok(v) = value.to_str() {
78            map.insert(name.as_str().to_string(), Value::String(v.to_string()));
79        }
80    }
81    Value::Object(map)
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87
88    #[test]
89    fn headers_to_json_converts_headers() {
90        let mut headers = HeaderMap::new();
91        headers.insert("content-type", "application/json".parse().unwrap());
92        headers.insert("x-custom", "value".parse().unwrap());
93
94        let json = headers_to_json(&headers);
95        let obj = json.as_object().unwrap();
96        assert_eq!(obj.len(), 2);
97        assert_eq!(obj["content-type"], "application/json");
98        assert_eq!(obj["x-custom"], "value");
99    }
100
101    #[test]
102    fn headers_to_json_empty() {
103        let headers = HeaderMap::new();
104        let json = headers_to_json(&headers);
105        assert_eq!(json, Value::Object(serde_json::Map::new()));
106    }
107}