greentic_runner_host/runner/
adapt_webhook.rs1use axum::BoxError;
2use axum::body::Body;
3use axum::extract::Path;
4use axum::http::{
5 HeaderMap, HeaderName, HeaderValue, Method, Response as AxumResponse, StatusCode, Uri,
6};
7use axum::response::IntoResponse;
8use serde_json::{Map, Value, json};
9
10use crate::engine::runtime::IngressEnvelope;
11use crate::routing::TenantRuntimeHandle;
12use crate::runtime::TenantRuntime;
13
14pub async fn dispatch(
15 TenantRuntimeHandle { tenant, runtime }: TenantRuntimeHandle,
16 Path(flow_id): Path<String>,
17 method: Method,
18 uri: Uri,
19 headers: HeaderMap,
20 body: axum::body::Bytes,
21) -> Result<AxumResponse<Body>, AxumResponse<Body>> {
22 let engine = runtime.engine();
23 let flow = engine
24 .flow_by_id(&flow_id)
25 .ok_or_else(|| build_error(StatusCode::NOT_FOUND, "flow not found"))?;
26
27 if flow.flow_type != "webhook" {
28 return Err(build_error(
29 StatusCode::CONFLICT,
30 "flow is not registered as a webhook",
31 ));
32 }
33
34 if !runtime.config().webhook_policy.is_allowed(uri.path()) {
35 return Err(build_error(
36 StatusCode::FORBIDDEN,
37 "path not permitted by policy",
38 ));
39 }
40
41 let idempotency_key = headers
42 .get("Idempotency-Key")
43 .and_then(|value| value.to_str().ok())
44 .map(|value| value.to_string());
45
46 if let Some(key) = idempotency_key.as_ref()
47 && let Some(cached) = lookup_cached(runtime.as_ref(), key)
48 {
49 tracing::debug!(flow_id = %flow.id, idempotency_key = key, "webhook cache hit");
50 return build_response(cached).map_err(|_| {
51 build_error(StatusCode::INTERNAL_SERVER_ERROR, "cached response invalid")
52 });
53 }
54
55 let normalized = normalize_request(&method, &uri, &headers, &body);
56 let envelope = IngressEnvelope {
57 tenant: tenant.clone(),
58 env: None,
59 pack_id: Some(flow.pack_id.clone()),
60 flow_id: flow.id.clone(),
61 flow_type: Some(flow.flow_type.clone()),
62 action: Some("webhook".into()),
63 session_hint: idempotency_key.clone(),
64 provider: Some("webhook".into()),
65 channel: Some(uri.path().to_string()),
66 conversation: Some(uri.path().to_string()),
67 user: None,
68 activity_id: idempotency_key.clone(),
69 timestamp: None,
70 payload: normalized,
71 metadata: None,
72 reply_scope: None,
73 }
74 .canonicalize();
75
76 match runtime.state_machine().handle(envelope).await {
77 Ok(value) => {
78 if let Some(key) = idempotency_key {
79 insert_cache(runtime.as_ref(), key.clone(), value.clone());
80 }
81 Ok(build_response(value).unwrap_or_else(|err| {
82 tracing::error!(flow_id = %flow.id, error = %err, "failed to render webhook response");
83 build_error(StatusCode::INTERNAL_SERVER_ERROR, "malformed flow response")
84 }))
85 }
86 Err(err) => {
87 let chain = err.chain().map(|e| e.to_string()).collect::<Vec<_>>();
88 tracing::error!(
89 flow_id = %flow.id,
90 error.cause_chain = ?chain,
91 "webhook flow execution failed"
92 );
93 Err(build_error(
94 StatusCode::INTERNAL_SERVER_ERROR,
95 "webhook flow failed",
96 ))
97 }
98 }
99}
100
101fn lookup_cached(runtime: &TenantRuntime, key: &str) -> Option<Value> {
102 let mut cache = runtime.webhook_cache().lock();
103 cache.get(key).cloned()
104}
105
106fn insert_cache(runtime: &TenantRuntime, key: String, value: Value) {
107 let mut cache = runtime.webhook_cache().lock();
108 cache.put(key, value);
109}
110
111fn normalize_request(method: &Method, uri: &Uri, headers: &HeaderMap, body: &[u8]) -> Value {
112 let headers_json = headers.iter().fold(Map::new(), |mut acc, (name, value)| {
113 acc.insert(
114 name.as_str().to_string(),
115 Value::String(value.to_str().unwrap_or_default().to_string()),
116 );
117 acc
118 });
119
120 let body_value = if body.is_empty() {
121 Value::Null
122 } else if let Ok(text) = std::str::from_utf8(body) {
123 json!({ "text": text })
124 } else {
125 json!({ "base16": hex::encode(body) })
126 };
127
128 json!({
129 "method": method.as_str(),
130 "path": uri.path(),
131 "query": uri.query(),
132 "headers": headers_json,
133 "body": body_value,
134 })
135}
136
137fn build_response(value: Value) -> Result<AxumResponse<Body>, BoxError> {
138 let mut builder = AxumResponse::builder().status(StatusCode::OK);
139 let mut headers = HeaderMap::new();
140 let body;
141
142 match value {
143 Value::String(text) => {
144 body = Body::from(text);
145 }
146 Value::Object(map) => {
147 if let Some(status) = map
148 .get("status")
149 .and_then(|status| status.as_u64())
150 .and_then(|status| u16::try_from(status).ok())
151 {
152 builder = builder.status(StatusCode::from_u16(status)?);
153 }
154 if let Some(headers_value) = map.get("headers").and_then(|h| h.as_object()) {
155 for (key, value) in headers_value {
156 if let Some(value_str) = value.as_str()
157 && let Ok(header_name) = key.parse::<HeaderName>()
158 && let Ok(header_value) = HeaderValue::from_str(value_str)
159 {
160 headers.insert(header_name, header_value);
161 }
162 }
163 }
164 if let Some(body_value) = map.get("body") {
165 body = serialize_body(body_value);
166 } else if let Some(text_value) = map.get("text") {
167 body = serialize_body(text_value);
168 } else {
169 body = Body::from(Value::Object(map).to_string());
170 }
171 }
172 other => {
173 body = Body::from(other.to_string());
174 }
175 }
176
177 let mut final_response = builder.body(body)?;
178 *final_response.headers_mut() = headers;
179 Ok(final_response)
180}
181
182fn serialize_body(value: &Value) -> Body {
183 match value {
184 Value::String(text) => Body::from(text.clone()),
185 Value::Object(obj) if obj.contains_key("text") => obj
186 .get("text")
187 .and_then(Value::as_str)
188 .map(|text| Body::from(text.to_owned()))
189 .unwrap_or_else(|| Body::from(value.to_string())),
190 Value::Object(obj) if obj.contains_key("base16") => obj
191 .get("base16")
192 .and_then(Value::as_str)
193 .and_then(|hex_value| hex::decode(hex_value).ok())
194 .map(Body::from)
195 .unwrap_or_else(|| Body::from(value.to_string())),
196 _ => Body::from(value.to_string()),
197 }
198}
199
200fn build_error(status: StatusCode, message: &'static str) -> AxumResponse<Body> {
201 let payload = json!({ "error": message });
202 (status, axum::Json(payload)).into_response()
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use axum::http::{Method, Uri};
209
210 #[test]
211 fn normalize_request_serializes_headers_and_body() {
212 let mut headers = HeaderMap::new();
213 headers.insert("X-Test", HeaderValue::from_static("value"));
214 let method = Method::POST;
215 let uri: Uri = "/hook?query=1".parse().unwrap();
216 let body = br#"{"hello":"world"}"#.to_vec();
217
218 let normalized = normalize_request(&method, &uri, &headers, &body);
219 assert_eq!(normalized["method"], json!("POST"));
220 assert_eq!(normalized["path"], json!("/hook"));
221 assert_eq!(normalized["query"], json!("query=1"));
222 assert_eq!(normalized["headers"]["x-test"], json!("value"));
223 assert_eq!(normalized["body"]["text"], json!(r#"{"hello":"world"}"#));
224 }
225
226 #[test]
227 fn build_response_accepts_string_and_object_forms() {
228 let string_response = build_response(json!("plain text")).unwrap();
229 assert_eq!(string_response.status(), StatusCode::OK);
230
231 let object_response = build_response(json!({
232 "status": 202,
233 "headers": { "X-Custom": "true" },
234 "body": { "text": "json text" }
235 }))
236 .unwrap();
237 assert_eq!(object_response.status(), StatusCode::ACCEPTED);
238 assert_eq!(object_response.headers().get("X-Custom").unwrap(), "true");
239 }
240}