Skip to main content

greentic_runner_host/runner/
adapt_webhook.rs

1use axum::BoxError;
2use axum::body::Body;
3use axum::extract::Path;
4use axum::http::{
5    HeaderMap, HeaderName, HeaderValue, Method, Response as AxumResponse, StatusCode, Uri,
6};
7use axum::response::IntoResponse;
8use serde_json::{Map, Value, json};
9
10use crate::engine::runtime::IngressEnvelope;
11use crate::routing::TenantRuntimeHandle;
12use crate::runtime::TenantRuntime;
13
14pub async fn dispatch(
15    TenantRuntimeHandle { tenant, runtime }: TenantRuntimeHandle,
16    Path(flow_id): Path<String>,
17    method: Method,
18    uri: Uri,
19    headers: HeaderMap,
20    body: axum::body::Bytes,
21) -> Result<AxumResponse<Body>, AxumResponse<Body>> {
22    let engine = runtime.engine();
23    let flow = engine
24        .flow_by_id(&flow_id)
25        .ok_or_else(|| build_error(StatusCode::NOT_FOUND, "flow not found"))?;
26
27    if flow.flow_type != "webhook" {
28        return Err(build_error(
29            StatusCode::CONFLICT,
30            "flow is not registered as a webhook",
31        ));
32    }
33
34    if !runtime.config().webhook_policy.is_allowed(uri.path()) {
35        return Err(build_error(
36            StatusCode::FORBIDDEN,
37            "path not permitted by policy",
38        ));
39    }
40
41    let idempotency_key = headers
42        .get("Idempotency-Key")
43        .and_then(|value| value.to_str().ok())
44        .map(|value| value.to_string());
45
46    if let Some(key) = idempotency_key.as_ref()
47        && let Some(cached) = lookup_cached(runtime.as_ref(), key)
48    {
49        tracing::debug!(flow_id = %flow.id, idempotency_key = key, "webhook cache hit");
50        return build_response(cached).map_err(|_| {
51            build_error(StatusCode::INTERNAL_SERVER_ERROR, "cached response invalid")
52        });
53    }
54
55    let normalized = normalize_request(&method, &uri, &headers, &body);
56    let envelope = IngressEnvelope {
57        tenant: tenant.clone(),
58        env: None,
59        pack_id: Some(flow.pack_id.clone()),
60        flow_id: flow.id.clone(),
61        flow_type: Some(flow.flow_type.clone()),
62        action: Some("webhook".into()),
63        session_hint: idempotency_key.clone(),
64        provider: Some("webhook".into()),
65        channel: Some(uri.path().to_string()),
66        conversation: Some(uri.path().to_string()),
67        user: None,
68        activity_id: idempotency_key.clone(),
69        timestamp: None,
70        payload: normalized,
71        metadata: None,
72        reply_scope: None,
73    }
74    .canonicalize();
75
76    match runtime.state_machine().handle(envelope).await {
77        Ok(value) => {
78            if let Some(key) = idempotency_key {
79                insert_cache(runtime.as_ref(), key.clone(), value.clone());
80            }
81            Ok(build_response(value).unwrap_or_else(|err| {
82                tracing::error!(flow_id = %flow.id, error = %err, "failed to render webhook response");
83                build_error(StatusCode::INTERNAL_SERVER_ERROR, "malformed flow response")
84            }))
85        }
86        Err(err) => {
87            let chain = err.chain().map(|e| e.to_string()).collect::<Vec<_>>();
88            tracing::error!(
89                flow_id = %flow.id,
90                error.cause_chain = ?chain,
91                "webhook flow execution failed"
92            );
93            Err(build_error(
94                StatusCode::INTERNAL_SERVER_ERROR,
95                "webhook flow failed",
96            ))
97        }
98    }
99}
100
101fn lookup_cached(runtime: &TenantRuntime, key: &str) -> Option<Value> {
102    let mut cache = runtime.webhook_cache().lock();
103    cache.get(key).cloned()
104}
105
106fn insert_cache(runtime: &TenantRuntime, key: String, value: Value) {
107    let mut cache = runtime.webhook_cache().lock();
108    cache.put(key, value);
109}
110
111fn normalize_request(method: &Method, uri: &Uri, headers: &HeaderMap, body: &[u8]) -> Value {
112    let headers_json = headers.iter().fold(Map::new(), |mut acc, (name, value)| {
113        acc.insert(
114            name.as_str().to_string(),
115            Value::String(value.to_str().unwrap_or_default().to_string()),
116        );
117        acc
118    });
119
120    let body_value = if body.is_empty() {
121        Value::Null
122    } else if let Ok(text) = std::str::from_utf8(body) {
123        json!({ "text": text })
124    } else {
125        json!({ "base16": hex::encode(body) })
126    };
127
128    json!({
129        "method": method.as_str(),
130        "path": uri.path(),
131        "query": uri.query(),
132        "headers": headers_json,
133        "body": body_value,
134    })
135}
136
137fn build_response(value: Value) -> Result<AxumResponse<Body>, BoxError> {
138    let mut builder = AxumResponse::builder().status(StatusCode::OK);
139    let mut headers = HeaderMap::new();
140    let body;
141
142    match value {
143        Value::String(text) => {
144            body = Body::from(text);
145        }
146        Value::Object(map) => {
147            if let Some(status) = map
148                .get("status")
149                .and_then(|status| status.as_u64())
150                .and_then(|status| u16::try_from(status).ok())
151            {
152                builder = builder.status(StatusCode::from_u16(status)?);
153            }
154            if let Some(headers_value) = map.get("headers").and_then(|h| h.as_object()) {
155                for (key, value) in headers_value {
156                    if let Some(value_str) = value.as_str()
157                        && let Ok(header_name) = key.parse::<HeaderName>()
158                        && let Ok(header_value) = HeaderValue::from_str(value_str)
159                    {
160                        headers.insert(header_name, header_value);
161                    }
162                }
163            }
164            if let Some(body_value) = map.get("body") {
165                body = serialize_body(body_value);
166            } else if let Some(text_value) = map.get("text") {
167                body = serialize_body(text_value);
168            } else {
169                body = Body::from(Value::Object(map).to_string());
170            }
171        }
172        other => {
173            body = Body::from(other.to_string());
174        }
175    }
176
177    let mut final_response = builder.body(body)?;
178    *final_response.headers_mut() = headers;
179    Ok(final_response)
180}
181
182fn serialize_body(value: &Value) -> Body {
183    match value {
184        Value::String(text) => Body::from(text.clone()),
185        Value::Object(obj) if obj.contains_key("text") => obj
186            .get("text")
187            .and_then(Value::as_str)
188            .map(|text| Body::from(text.to_owned()))
189            .unwrap_or_else(|| Body::from(value.to_string())),
190        Value::Object(obj) if obj.contains_key("base16") => obj
191            .get("base16")
192            .and_then(Value::as_str)
193            .and_then(|hex_value| hex::decode(hex_value).ok())
194            .map(Body::from)
195            .unwrap_or_else(|| Body::from(value.to_string())),
196        _ => Body::from(value.to_string()),
197    }
198}
199
200fn build_error(status: StatusCode, message: &'static str) -> AxumResponse<Body> {
201    let payload = json!({ "error": message });
202    (status, axum::Json(payload)).into_response()
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use axum::http::{Method, Uri};
209
210    #[test]
211    fn normalize_request_serializes_headers_and_body() {
212        let mut headers = HeaderMap::new();
213        headers.insert("X-Test", HeaderValue::from_static("value"));
214        let method = Method::POST;
215        let uri: Uri = "/hook?query=1".parse().unwrap();
216        let body = br#"{"hello":"world"}"#.to_vec();
217
218        let normalized = normalize_request(&method, &uri, &headers, &body);
219        assert_eq!(normalized["method"], json!("POST"));
220        assert_eq!(normalized["path"], json!("/hook"));
221        assert_eq!(normalized["query"], json!("query=1"));
222        assert_eq!(normalized["headers"]["x-test"], json!("value"));
223        assert_eq!(normalized["body"]["text"], json!(r#"{"hello":"world"}"#));
224    }
225
226    #[test]
227    fn build_response_accepts_string_and_object_forms() {
228        let string_response = build_response(json!("plain text")).unwrap();
229        assert_eq!(string_response.status(), StatusCode::OK);
230
231        let object_response = build_response(json!({
232            "status": 202,
233            "headers": { "X-Custom": "true" },
234            "body": { "text": "json text" }
235        }))
236        .unwrap();
237        assert_eq!(object_response.status(), StatusCode::ACCEPTED);
238        assert_eq!(object_response.headers().get("X-Custom").unwrap(), "true");
239    }
240}