worldinterface_http_trigger/
handler.rs1use std::sync::RwLock;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use axum::http::HeaderMap;
7use serde_json::Value;
8use worldinterface_core::id::FlowRunId;
9use worldinterface_host::EmbeddedHost;
10
11use crate::error::WebhookError;
12use crate::receipt::create_trigger_receipt;
13use crate::registry::WebhookRegistry;
14use crate::trigger::TriggerInput;
15
16pub async fn handle_webhook(
22 path: &str,
23 body: &[u8],
24 headers: &HeaderMap,
25 source_addr: Option<String>,
26 host: &EmbeddedHost,
27 webhook_registry: &RwLock<WebhookRegistry>,
28) -> Result<(FlowRunId, Value), WebhookError> {
29 let flow_spec = {
31 let registry = webhook_registry.read().unwrap();
32 let registration = registry
33 .get_by_path(path)
34 .ok_or_else(|| WebhookError::PathNotFound(path.to_string()))?;
35 registration.flow_spec.clone()
36 };
37
38 let body_value: Value = serde_json::from_slice(body)
40 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(body).into_owned()));
41
42 let headers_value = headers_to_json(headers);
43
44 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
45
46 let trigger_input = TriggerInput {
47 body: body_value,
48 headers: headers_value,
49 method: "POST".to_string(),
50 path: path.to_string(),
51 source_addr,
52 received_at: now,
53 };
54
55 let trigger_value = serde_json::to_value(&trigger_input)?;
57 let flow_run_id = host.submit_flow_with_trigger_input(flow_spec, trigger_value).await?;
58
59 let receipt = create_trigger_receipt(flow_run_id, &trigger_input);
61 let receipt_value = serde_json::to_value(&receipt)?;
62 host.store_trigger_receipt(flow_run_id, &receipt_value)?;
63
64 tracing::info!(
65 flow_run_id = %flow_run_id,
66 webhook_path = %path,
67 "webhook triggered flow run"
68 );
69
70 Ok((flow_run_id, receipt_value))
71}
72
73fn headers_to_json(headers: &HeaderMap) -> Value {
75 let mut map = serde_json::Map::new();
76 for (name, value) in headers.iter() {
77 if let Ok(v) = value.to_str() {
78 map.insert(name.as_str().to_string(), Value::String(v.to_string()));
79 }
80 }
81 Value::Object(map)
82}
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87
88 #[test]
89 fn headers_to_json_converts_headers() {
90 let mut headers = HeaderMap::new();
91 headers.insert("content-type", "application/json".parse().unwrap());
92 headers.insert("x-custom", "value".parse().unwrap());
93
94 let json = headers_to_json(&headers);
95 let obj = json.as_object().unwrap();
96 assert_eq!(obj.len(), 2);
97 assert_eq!(obj["content-type"], "application/json");
98 assert_eq!(obj["x-custom"], "value");
99 }
100
101 #[test]
102 fn headers_to_json_empty() {
103 let headers = HeaderMap::new();
104 let json = headers_to_json(&headers);
105 assert_eq!(json, Value::Object(serde_json::Map::new()));
106 }
107}