use actix_web::HttpRequest;
use actix_web::web::Data;
use regex::Regex;
use reqwest::{Method, header::HeaderMap, header::HeaderName, header::HeaderValue};
use serde_json::{Value, json};
use sha256::digest;
use sqlx::postgres::PgPool;
use std::sync::LazyLock;
use std::time::Instant;
use tracing::{error, warn};
use crate::AppState;
use crate::api::client_context::try_auth_pool;
use crate::data::webhooks::{
GatewayWebhookRecord, insert_webhook_delivery_pending, list_webhooks_for_dispatch,
update_webhook_delivery_outcome,
};
const RESPONSE_SNIPPET_MAX: usize = 8192;
const CONTEXT_JSON_MAX_DEPTH: usize = 12;
const CONTEXT_MAX_STRING: usize = 4096;
static ATHENA_PLACEHOLDER: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"\{\{\s*ATHENA\.([a-zA-Z_][a-zA-Z0-9_]*)\.([a-zA-Z_][a-zA-Z0-9_]*)\.([a-zA-Z_][a-zA-Z0-9_]*)\s*\}\}")
.expect("ATHENA placeholder regex")
});
static GENERIC_PLACEHOLDER: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"\{\{\s*([^}]+?)\s*\}\}").expect("generic placeholder regex"));
pub fn gateway_webhook_trigger_from_http(
req: &HttpRequest,
client_name: &str,
route_key: &str,
table_name: Option<String>,
request_id: Option<String>,
payload: Option<Value>,
response: Option<Value>,
) -> GatewayWebhookTrigger {
let mut headers: Vec<(String, String)> = Vec::new();
for (name, value) in req.headers() {
if let Ok(vs) = value.to_str() {
headers.push((name.as_str().to_string(), vs.to_string()));
}
}
GatewayWebhookTrigger {
client_name: client_name.to_string(),
route_key: route_key.to_string(),
table_name,
request_id,
request_method: req.method().as_str().to_string(),
request_path: req.path().to_string(),
headers,
payload,
response,
}
}
#[derive(Debug, Clone)]
pub struct GatewayWebhookTrigger {
pub client_name: String,
pub route_key: String,
pub table_name: Option<String>,
pub request_id: Option<String>,
pub request_method: String,
pub request_path: String,
pub headers: Vec<(String, String)>,
pub payload: Option<Value>,
pub response: Option<Value>,
}
pub fn spawn_gateway_webhook_dispatch(app_state: Data<AppState>, trigger: GatewayWebhookTrigger) {
tokio::spawn(async move {
run_gateway_webhook_dispatch(&app_state, trigger).await;
});
}
pub async fn dispatch_webhook_test(
app_state: &AppState,
catalog_pool: &PgPool,
webhook: &GatewayWebhookRecord,
trigger: GatewayWebhookTrigger,
) -> Result<(i64, String, Option<i32>, Option<String>), String> {
let fp: String = fingerprint_from_gate_response(&trigger.response);
let idempotency_key: String = delivery_idempotency_key(webhook.id, &trigger, &fp);
let context_snapshot: Value = build_context_snapshot(&trigger, webhook);
let delivery_id: Option<i64> = insert_webhook_delivery_pending(
catalog_pool,
webhook.id,
&trigger.route_key,
trigger.request_id.as_deref(),
&idempotency_key,
context_snapshot,
)
.await
.map_err(|e| e.to_string())?;
let delivery_id: i64 = match delivery_id {
Some(id) => id,
None => {
return Err("duplicate idempotency key — delivery already recorded".to_string());
}
};
match execute_webhook_http(app_state, webhook, &trigger).await {
Ok((resolved_url, status, snippet)) => {
update_webhook_delivery_outcome(
catalog_pool,
delivery_id,
"success",
Some(status.as_u16() as i32),
None,
Some(snippet.as_str()),
None,
None,
Some(resolved_url.as_str()),
)
.await
.map_err(|e| e.to_string())?;
Ok((
delivery_id,
resolved_url,
Some(status.as_u16() as i32),
Some(snippet),
))
}
Err(err_msg) => {
update_webhook_delivery_outcome(
catalog_pool,
delivery_id,
"failed",
None,
None,
None,
Some(err_msg.as_str()),
None,
None,
)
.await
.map_err(|e| e.to_string())?;
Err(err_msg)
}
}
}
async fn run_gateway_webhook_dispatch(app_state: &AppState, trigger: GatewayWebhookTrigger) {
let Some(catalog_pool) = try_auth_pool(app_state) else {
warn!("gateway webhooks: auth/logging catalog pool unavailable; skipping");
return;
};
let instance_url: Option<String> = std::env::var("ATHENA_WEBHOOK_PUBLIC_BASE_URL")
.ok()
.map(|s| normalize_base_url(&s));
let table_ref: Option<&str> = trigger.table_name.as_deref();
let hooks: Vec<GatewayWebhookRecord> = match list_webhooks_for_dispatch(
&catalog_pool,
&trigger.client_name,
&trigger.route_key,
table_ref,
instance_url.as_deref(),
)
.await
{
Ok(h) => h,
Err(err) => {
error!(error = %err, "gateway webhooks: list failed");
return;
}
};
if hooks.is_empty() {
return;
}
let fp: String = fingerprint_from_gate_response(&trigger.response);
for hook in hooks {
let idempotency_key: String = delivery_idempotency_key(hook.id, &trigger, &fp);
let context_snapshot: Value = build_context_snapshot(&trigger, &hook);
let delivery_id: Option<i64> = match insert_webhook_delivery_pending(
&catalog_pool,
hook.id,
&trigger.route_key,
trigger.request_id.as_deref(),
&idempotency_key,
context_snapshot,
)
.await
{
Ok(id) => id,
Err(err) => {
error!(error = %err, webhook_id = hook.id, "gateway webhooks: insert delivery failed");
continue;
}
};
let delivery_id: i64 = match delivery_id {
Some(id) => id,
None => continue,
};
let started: Instant = Instant::now();
match execute_webhook_http(app_state, &hook, &trigger).await {
Ok((resolved_url, status, snippet)) => {
let duration_ms: i64 = started.elapsed().as_millis() as i64;
if let Err(err) = update_webhook_delivery_outcome(
&catalog_pool,
delivery_id,
"success",
Some(status.as_u16() as i32),
None,
Some(snippet.as_str()),
None,
Some(duration_ms),
Some(resolved_url.as_str()),
)
.await
{
error!(error = %err, "gateway webhooks: finalize success delivery failed");
}
}
Err(err_msg) => {
let duration_ms: i64 = started.elapsed().as_millis() as i64;
if let Err(err) = update_webhook_delivery_outcome(
&catalog_pool,
delivery_id,
"failed",
None,
None,
None,
Some(err_msg.as_str()),
Some(duration_ms),
None,
)
.await
{
error!(error = %err, "gateway webhooks: finalize failed delivery failed");
}
}
}
}
}
fn normalize_base_url(url: &str) -> String {
url.trim().trim_end_matches('/').to_string()
}
fn fingerprint_from_gate_response(response: &Option<Value>) -> String {
let Some(v) = response else {
return String::new();
};
if let Some(rid) = v.get("resource_id").and_then(|x| x.as_str()) {
return rid.to_string();
}
if let Some(d) = v.get("data") {
if let Some(rid) = d.get("resource_id").and_then(|x| x.as_str()) {
return rid.to_string();
}
}
v.to_string().chars().take(512).collect()
}
fn delivery_idempotency_key(
webhook_id: i64,
trigger: &GatewayWebhookTrigger,
fingerprint: &str,
) -> String {
let rid: &str = trigger.request_id.as_deref().unwrap_or("");
let tab: &str = trigger.table_name.as_deref().unwrap_or("");
digest(format!(
"{}|{}|{}|{}|{}",
webhook_id, rid, trigger.route_key, tab, fingerprint
))
}
fn build_context_snapshot(
trigger: &GatewayWebhookTrigger,
webhook: &GatewayWebhookRecord,
) -> Value {
let headers_map: serde_json::Map<String, Value> = trigger
.headers
.iter()
.map(|(k, v)| {
(
header_key_to_ctx_key(k),
json!(truncate_str(v, CONTEXT_MAX_STRING)),
)
})
.collect();
let mut base: serde_json::Map<String, Value> = serde_json::Map::new();
base.insert(
"request".to_string(),
json!({
"method": trigger.request_method,
"path": trigger.request_path,
}),
);
base.insert("client".to_string(), json!(trigger.client_name));
base.insert("route_key".to_string(), json!(trigger.route_key));
base.insert(
"table".to_string(),
json!(trigger.table_name.clone().unwrap_or_default()),
);
base.insert("headers".to_string(), Value::Object(headers_map));
if webhook.include_request_body_in_context {
if let Some(p) = &trigger.payload {
base.insert("payload".to_string(), shallow_copy_bounded(p));
} else {
base.insert("payload".to_string(), Value::Null);
}
}
if let Some(r) = &trigger.response {
base.insert("response".to_string(), shallow_copy_bounded(r));
}
Value::Object(base)
}
fn shallow_copy_bounded(v: &Value) -> Value {
cap_json_value(v, 0)
}
fn cap_json_value(v: &Value, depth: usize) -> Value {
if depth > CONTEXT_JSON_MAX_DEPTH {
return json!("<max_depth>");
}
match v {
Value::Null | Value::Bool(_) => v.clone(),
Value::Number(n) => Value::Number(n.clone()),
Value::String(s) => Value::String(truncate_str(s, CONTEXT_MAX_STRING)),
Value::Array(a) => {
let take: usize = a.len().min(50);
Value::Array(
a.iter()
.take(take)
.map(|x| cap_json_value(x, depth + 1))
.collect(),
)
}
Value::Object(m) => {
let mut out: serde_json::Map<String, Value> = serde_json::Map::new();
for (i, (k, val)) in m.iter().enumerate() {
if i >= 64 {
out.insert("_truncated".to_string(), json!(true));
break;
}
out.insert(k.clone(), cap_json_value(val, depth + 1));
}
Value::Object(out)
}
}
}
fn truncate_str(s: &str, max: usize) -> String {
if s.len() <= max {
return s.to_string();
}
format!("{}…", &s[..max.min(s.len())])
}
fn header_key_to_ctx_key(header_name: &str) -> String {
header_name.trim().to_lowercase().replace('-', "_")
}
fn value_at_path<'a>(v: &'a Value, path: &str) -> Option<&'a Value> {
let mut cur: &Value = v;
for part in path.split('.').map(str::trim).filter(|p| !p.is_empty()) {
match cur {
Value::Object(map) => {
cur = map.get(part)?;
}
_ => return None,
}
}
Some(cur)
}
fn render_simple_placeholders(template: &str, context: &Value) -> String {
let mut out: String = template.to_string();
for cap in GENERIC_PLACEHOLDER.captures_iter(template) {
let full: &str = cap.get(0).map(|m| m.as_str()).unwrap_or("");
let path: &str = cap.get(1).map(|m| m.as_str()).unwrap_or("").trim();
if path.starts_with("ATHENA.") {
continue;
}
let replacement: String = value_at_path(context, path)
.map(value_to_template_string)
.unwrap_or_default();
out = out.replace(full, &replacement);
}
out
}
fn value_to_template_string(v: &Value) -> String {
match v {
Value::String(s) => s.clone(),
Value::Null => String::new(),
_ => v.to_string(),
}
}
fn validate_sql_ident(segment: &str) -> bool {
!segment.is_empty()
&& segment
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_')
}
async fn resolve_key_source(
source: &str,
trigger: &GatewayWebhookTrigger,
context: &Value,
) -> Option<String> {
let s: String = source.trim().to_string();
if let Some(rest) = s.strip_prefix("header:") {
let hk: String = header_key_to_ctx_key(rest);
let headers: &Value = context.get("headers")?;
return value_at_path(headers, &hk)
.and_then(|v| v.as_str())
.map(|x| x.to_string());
}
if let Some(rest) = s.strip_prefix("payload:") {
return value_at_path(context.get("payload")?, rest)
.map(value_to_template_string)
.filter(|x| !x.is_empty());
}
if let Some(rest) = s.strip_prefix("response:") {
return value_at_path(context.get("response")?, rest)
.map(value_to_template_string)
.filter(|x| !x.is_empty());
}
let _ = trigger;
None
}
async fn athnea_column_lookup(
app_state: &AppState,
target_client: &str,
schema: &str,
table: &str,
column: &str,
key_column: &str,
key_value: &str,
lookup_keys: &Value,
) -> Result<Option<String>, String> {
if !validate_sql_ident(schema)
|| !validate_sql_ident(table)
|| !validate_sql_ident(column)
|| !validate_sql_ident(key_column)
{
return Err("invalid ATHENA identifier".to_string());
}
let pool = app_state
.pg_registry
.get_pool(target_client)
.ok_or_else(|| {
format!(
"client '{}' pool unavailable for ATHENA lookup",
target_client
)
})?;
let lk_key: String = format!("{}.{}", schema, table);
let (eff_key_col, _eff_source): (String, String) = lookup_keys
.get(lk_key)
.and_then(|o| o.as_object())
.map(|o| {
let kc: String = o
.get("key_column")
.and_then(|v| v.as_str())
.unwrap_or("id")
.to_string();
let ks: String = o
.get("key_source")
.and_then(|v| v.as_str())
.unwrap_or("header:x-user-id")
.to_string();
(kc, ks)
})
.unwrap_or_else(|| ("id".to_string(), "header:x-user-id".to_string()));
if !validate_sql_ident(&eff_key_col) {
return Err("invalid key_column in lookup_keys".to_string());
}
let sql: String = format!(
r#"SELECT "{}"::text FROM "{}"."{}" WHERE "{}"::text = $1"#,
column, schema, table, eff_key_col
);
let row: Option<(Option<String>,)> = sqlx::query_as::<_, (Option<String>,)>(&sql)
.bind(key_value)
.fetch_optional(&pool)
.await
.map_err(|e| e.to_string())?;
Ok(row.and_then(|t| t.0))
}
async fn render_athena_placeholders(
template: &str,
app_state: &AppState,
target_client: &str,
trigger: &GatewayWebhookTrigger,
context: &Value,
lookup_keys: &Value,
) -> Result<String, String> {
let mut out: String = template.to_string();
for cap in ATHENA_PLACEHOLDER.captures_iter(template) {
let full: &str = cap.get(0).map(|m| m.as_str()).unwrap_or("");
let schema: &str = cap.get(1).map(|m| m.as_str()).unwrap_or("");
let table: &str = cap.get(2).map(|m| m.as_str()).unwrap_or("");
let column: &str = cap.get(3).map(|m| m.as_str()).unwrap_or("");
let lk_key: String = format!("{}.{}", schema, table);
let cfg: (&str, &str) = lookup_keys
.get(&lk_key)
.and_then(|o| o.as_object())
.map(|o| {
(
o.get("key_column").and_then(|v| v.as_str()).unwrap_or("id"),
o.get("key_source")
.and_then(|v| v.as_str())
.unwrap_or("header:x-user-id"),
)
})
.unwrap_or(("id", "header:x-user-id"));
let key_val: Option<String> = resolve_key_source(cfg.1, trigger, context).await;
let replacement: String = match key_val {
Some(kv) => {
match athnea_column_lookup(
app_state,
target_client,
schema,
table,
column,
cfg.0,
&kv,
lookup_keys,
)
.await?
{
Some(s) => s,
None => String::new(),
}
}
None => String::new(),
};
out = out.replace(full, replacement.as_str());
}
Ok(out)
}
async fn render_template_string(
template: &str,
app_state: &AppState,
target_client: &str,
trigger: &GatewayWebhookTrigger,
context: &Value,
lookup_keys: &Value,
) -> Result<String, String> {
let after_athnea: String = render_athena_placeholders(
template,
app_state,
target_client,
trigger,
context,
lookup_keys,
)
.await?;
Ok(render_simple_placeholders(&after_athnea, context))
}
async fn execute_webhook_http(
app_state: &AppState,
hook: &GatewayWebhookRecord,
trigger: &GatewayWebhookTrigger,
) -> Result<(String, reqwest::StatusCode, String), String> {
let context: Value = build_context_snapshot(trigger, hook);
let method: Method = hook
.http_method
.parse::<Method>()
.map_err(|e| format!("invalid webhook http_method: {}", e))?;
let lookup_keys: &Value = &hook.lookup_keys;
let resolved_url: String = render_template_string(
&hook.url_template,
app_state,
&hook.client_name,
trigger,
&context,
lookup_keys,
)
.await?;
let headers_map: &serde_json::Map<String, Value> = hook
.headers_templates
.as_object()
.ok_or_else(|| "headers_templates must be a JSON object".to_string())?;
let mut hdrs: HeaderMap = HeaderMap::new();
for (hk, hv) in headers_map {
let rendered_v: String = if let Value::String(s) = hv {
render_template_string(
s,
app_state,
&hook.client_name,
trigger,
&context,
lookup_keys,
)
.await?
} else {
hv.to_string()
};
let name: HeaderName =
HeaderName::try_from(hk.as_str()).map_err(|_| format!("invalid header name {}", hk))?;
let val: HeaderValue = HeaderValue::try_from(rendered_v.as_str())
.map_err(|_| format!("invalid header value for {}", hk))?;
hdrs.append(name, val);
}
let body: Option<String> = if matches!(method, Method::GET | Method::DELETE) {
None
} else if let Some(b) = &hook.body_template {
Some(
render_template_string(
b,
app_state,
&hook.client_name,
trigger,
&context,
lookup_keys,
)
.await?,
)
} else {
None
};
let timeout: std::time::Duration =
std::time::Duration::from_millis(hook.timeout_ms.clamp(1000, 300_000) as u64);
let mut req: reqwest::RequestBuilder = app_state
.client
.request(method.clone(), resolved_url.as_str())
.timeout(timeout)
.headers(hdrs);
if let Some(b) = body {
req = req.body(b);
}
let resp: reqwest::Response = req
.send()
.await
.map_err(|e| format!("HTTP client error: {}", e))?;
let status: reqwest::StatusCode = resp.status();
let text: String = resp
.text()
.await
.unwrap_or_default()
.chars()
.take(RESPONSE_SNIPPET_MAX)
.collect();
Ok((resolved_url, status, text))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn generic_placeholder_replaces_simple_path() {
let ctx: Value = json!({
"headers": { "x_user_id": "u1" },
"client": "c1"
});
let s: String = render_simple_placeholders("{{headers.x_user_id}}|{{client}}", &ctx);
assert_eq!(s, "u1|c1");
}
#[test]
fn generic_placeholder_skips_athena_tokens() {
let ctx: Value = json!({ "client": "c1" });
let s: String =
render_simple_placeholders("{{client}}|{{ ATHENA.public.users.email }}", &ctx);
assert_eq!(s, "c1|{{ ATHENA.public.users.email }}");
}
#[test]
fn validate_ident_rejects_injection() {
assert!(!validate_sql_ident("foo;drop"));
assert!(validate_sql_ident("users"));
}
#[test]
fn delivery_idempotency_key_stable_for_same_inputs() {
let t: GatewayWebhookTrigger = GatewayWebhookTrigger {
client_name: "athena_logging".to_string(),
route_key: "gateway_fetch".to_string(),
table_name: Some("t1".to_string()),
request_id: Some("rid-1".to_string()),
request_method: "POST".to_string(),
request_path: "/gateway/fetch".to_string(),
headers: vec![],
payload: None,
response: Some(json!({ "data": { "resource_id": "abc" } })),
};
let a: String = delivery_idempotency_key(42, &t, "abc");
let b: String = delivery_idempotency_key(42, &t, "abc");
assert_eq!(a, b);
}
#[test]
fn fingerprint_prefers_resource_id() {
let v: Value = json!({
"data": { "resource_id": "res-9" },
"other": "noise"
});
assert_eq!(fingerprint_from_gate_response(&Some(v)), "res-9");
}
}