use actix_web::{
HttpRequest, HttpResponse, Responder, get, post,
web::{self, Bytes, Data},
};
use serde_json::{Map, Value, json};
use sha256::digest;
use std::collections::HashMap;
use std::env;
use tracing::{error, info, warn};
use athena_webhooks::{
InboundWebhookSinkDefinition, InboundWebhookSinkPayload, InboundWebhookSinkRuntimeContext,
WEBHOOK_ERROR_CODE_WEBHOOK_SINK_NOT_FOUND, build_default_inbound_sink_row,
build_projected_inbound_sink_rows, default_secret_header_name, default_secret_query_param,
json_pointer_string, qualify_webhook_sink_table_name, resolve_inbound_sink_client_name,
summary_from_inbound_sink,
};
use crate::AppState;
use crate::api::client_context::pool_for_client;
use crate::api::response::{
api_success, bad_request, forbidden_with_code, internal_error, not_found_with_code,
unauthorized_with_code,
};
use crate::api::webhook_sink_schema_heal::insert_row_with_sink_schema_heal;
use crate::athena::postgres_clients::catalog_client_has_database_connection;
use crate::drivers::postgresql::sqlx_driver::RegisteredClient;
const MOLLIE_SINK_NAME: &str = "mollie";
#[get("/webhook-sinks/templates")]
async fn list_webhook_sink_templates(state: Data<AppState>) -> impl Responder {
let mut templates = state
.webhook_sink_registry
.as_ref()
.map(|registry| {
registry
.iter()
.map(|(name, definition)| summary_from_inbound_sink(name, definition))
.collect::<Vec<_>>()
})
.unwrap_or_default();
templates.sort_by(|left, right| left.name.cmp(&right.name));
api_success(
"Listed inbound webhook sink templates",
json!({ "templates": templates }),
)
}
#[post("/webhook-sinks/{sink_name}")]
async fn post_webhook_sink(
sink_name: web::Path<String>,
req: HttpRequest,
body: Bytes,
state: Data<AppState>,
) -> impl Responder {
ingest_sink_request(sink_name.as_str(), req, body, state).await
}
#[post("/webhooks/{sink_name}")]
async fn post_webhook_alias(
sink_name: web::Path<String>,
req: HttpRequest,
body: Bytes,
state: Data<AppState>,
) -> impl Responder {
ingest_sink_request(sink_name.as_str(), req, body, state).await
}
#[post("/mollie")]
async fn post_mollie_alias(req: HttpRequest, body: Bytes, state: Data<AppState>) -> impl Responder {
ingest_sink_request(MOLLIE_SINK_NAME, req, body, state).await
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(list_webhook_sink_templates)
.service(post_webhook_sink)
.service(post_webhook_alias)
.service(post_mollie_alias);
}
async fn ingest_sink_request(
sink_name: &str,
req: HttpRequest,
body: Bytes,
state: Data<AppState>,
) -> HttpResponse {
let Some(registry) = state.webhook_sink_registry.as_ref() else {
warn!(
target: "athena::webhook_sinks",
sink_name = %sink_name,
"Inbound webhook sink request rejected because no registry is loaded"
);
return not_found_with_code(
"Inbound webhook sink definition was not found",
format!("No inbound webhook sink named '{sink_name}' is loaded"),
WEBHOOK_ERROR_CODE_WEBHOOK_SINK_NOT_FOUND,
);
};
let Some((resolved_sink_name, definition)) = find_sink_definition(registry, sink_name) else {
warn!(
target: "athena::webhook_sinks",
sink_name = %sink_name,
"Inbound webhook sink request rejected because the sink definition was not found"
);
return not_found_with_code(
"Inbound webhook sink definition was not found",
format!("No inbound webhook sink named '{sink_name}' is loaded"),
WEBHOOK_ERROR_CODE_WEBHOOK_SINK_NOT_FOUND,
);
};
if let Err(response) = enforce_sink_athena_base_url(definition, &req, resolved_sink_name) {
return response;
}
let query_params: HashMap<String, String> =
serde_urlencoded::from_str(req.query_string()).unwrap_or_default();
let query_params_json = map_to_json_object(&query_params);
let headers_json = headers_to_json_object(req.headers());
let secret_header = default_secret_header_name(definition);
let secret_query_param = default_secret_query_param(definition);
let provided_secret = req
.headers()
.get(secret_header.as_str())
.and_then(|value| value.to_str().ok())
.map(str::to_string)
.or_else(|| query_params.get(&secret_query_param).cloned());
let expected_secret = match expected_shared_secret(definition) {
Ok(secret) => secret,
Err(response) => return response,
};
let verified = match expected_secret.as_deref() {
Some(expected) => match provided_secret.as_deref() {
Some(provided) if provided == expected => true,
_ => {
let secret_error = if provided_secret.is_some() {
format!(
"Request secret did not match the configured secret for sink '{}'. Expected query param '{}' or header '{}'.",
resolved_sink_name, secret_query_param, secret_header
)
} else {
format!(
"Request secret was missing for sink '{}'. Provide it via query param '{}' or header '{}'.",
resolved_sink_name, secret_query_param, secret_header
)
};
warn!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
secret_header = %secret_header,
secret_query_param = %secret_query_param,
secret_present = provided_secret.is_some(),
"Inbound webhook sink secret rejected"
);
return unauthorized_with_code(
"Inbound webhook secret rejected",
secret_error,
"BILLING_WEBHOOK_SECRET_INVALID",
);
}
},
None => false,
};
let payload_text = std::str::from_utf8(&body).ok().map(str::to_string);
let payload_json = serde_json::from_slice::<Value>(&body).ok();
let event_type = json_pointer_string(
payload_json.as_ref(),
definition.event_type_json_pointer.as_deref(),
);
let external_id = json_pointer_string(
payload_json.as_ref(),
definition.external_id_json_pointer.as_deref(),
);
let metadata = match sink_metadata(definition) {
Ok(metadata) => metadata,
Err(response) => return response,
};
let client_header = req
.headers()
.get("X-Athena-Client")
.and_then(|value| value.to_str().ok());
let Some(primary_client_name) =
resolve_inbound_sink_client_name(definition.client.as_deref(), client_header)
else {
warn!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
"Inbound webhook sink request rejected because the client could not be resolved"
);
return bad_request(
"Inbound webhook sink client could not be resolved",
"Set `client` in config/webhook-sinks.yaml or provide X-Athena-Client",
);
};
let is_control_event = is_mollie_hook_ping_event(resolved_sink_name, event_type.as_deref());
let acknowledge_without_persistence = definition.acknowledge_without_persistence;
let primary_pool = match pool_for_client(state.get_ref(), &primary_client_name).await {
Ok(pool) => pool,
Err(response) => {
if acknowledge_without_persistence {
log_sink_degraded_acknowledgement(
resolved_sink_name,
&primary_client_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
false,
"primary_client_unavailable",
Some(response_error_message(&response).as_str()),
);
return acknowledged_without_persistence_response(
resolved_sink_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
false,
"primary_client_unavailable",
Some(response_error_message(&response)),
);
}
log_sink_client_unavailable(
state.get_ref(),
definition.client.as_deref(),
client_header,
resolved_sink_name,
&primary_client_name,
None,
event_type.as_deref(),
external_id.as_deref(),
verified,
&response,
"primary_client_unavailable",
)
.await;
return response;
}
};
let qualified_table_name = match qualify_webhook_sink_table_name(
definition.schema_name.as_deref(),
&definition.table_name,
) {
Ok(table_name) => table_name,
Err(error) => {
return internal_error(
"Inbound webhook sink table config is invalid",
format!("sink '{}' is invalid: {error}", resolved_sink_name),
);
}
};
let received_at = chrono::Utc::now();
let request_path = req.path().to_string();
let http_method = req.method().as_str().to_string();
let source_ip = req
.connection_info()
.realip_remote_addr()
.map(str::to_string);
let content_type = req
.headers()
.get("content-type")
.and_then(|value| value.to_str().ok())
.map(str::to_string);
let query_params_value = Value::Object(query_params_json.clone());
let headers_value = Value::Object(headers_json.clone());
let raw_row = build_default_inbound_sink_row(InboundWebhookSinkPayload {
sink_name: resolved_sink_name,
received_at,
request_path: request_path.as_str(),
http_method: http_method.as_str(),
source_ip: source_ip.as_deref(),
content_type: content_type.as_deref(),
query_params_json: query_params_value.clone(),
headers_json: headers_value.clone(),
payload_json: payload_json.clone(),
payload_text: payload_text.as_deref(),
verified,
event_type: event_type.clone(),
external_id: external_id.clone(),
metadata: metadata.clone(),
});
let inserted_row = match insert_row_with_sink_schema_heal(
resolved_sink_name,
&primary_client_name,
&primary_pool,
&qualified_table_name,
&raw_row,
)
.await
{
Ok(row) => row,
Err(error) => {
if acknowledge_without_persistence {
log_sink_degraded_acknowledgement(
resolved_sink_name,
&primary_client_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
false,
"primary_insert_failed",
Some(error.to_string().as_str()),
);
return acknowledged_without_persistence_response(
resolved_sink_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
false,
"primary_insert_failed",
Some(error.to_string()),
);
}
error!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
client_name = %primary_client_name,
event_type = event_type.as_deref().unwrap_or(""),
external_id = external_id.as_deref().unwrap_or(""),
verified,
table_name = %qualified_table_name,
error = %error,
error_chain = %format!("{error:#}"),
"Failed to persist inbound webhook sink event"
);
return internal_error(
"Failed to persist inbound webhook sink event",
error.to_string(),
);
}
};
let payload_sha256 = digest(payload_text.as_deref().unwrap_or_default());
let runtime_context = InboundWebhookSinkRuntimeContext {
sink_name: resolved_sink_name,
received_at,
request_path: request_path.as_str(),
http_method: http_method.as_str(),
source_ip: source_ip.as_deref(),
content_type: content_type.as_deref(),
verified,
event_type: event_type.as_deref(),
external_id: external_id.as_deref(),
query_params_json: &query_params_value,
headers_json: &headers_value,
payload_json: payload_json.as_ref(),
payload_text: payload_text.as_deref(),
payload_sha256: &payload_sha256,
metadata: &metadata,
};
let projected_rows = match build_projected_inbound_sink_rows(definition, &runtime_context) {
Ok(rows) => rows,
Err(error) => {
if acknowledge_without_persistence {
log_sink_degraded_acknowledgement(
resolved_sink_name,
&primary_client_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
true,
"projection_build_failed",
Some(error.as_str()),
);
return acknowledged_without_persistence_response(
resolved_sink_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
true,
"projection_build_failed",
Some(error),
);
}
error!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
client_name = %primary_client_name,
event_type = event_type.as_deref().unwrap_or(""),
external_id = external_id.as_deref().unwrap_or(""),
verified,
error = %error,
"Failed to build inbound webhook sink projection rows"
);
return internal_error(
"Failed to build inbound webhook sink projection rows",
error,
);
}
};
let mut target_inserts = Vec::with_capacity(projected_rows.len());
for (target_client_override, target_table_name, projected_row) in projected_rows {
let Some(target_client_name) = resolve_inbound_sink_client_name(
target_client_override.as_deref(),
Some(&primary_client_name),
) else {
error!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
client_name = %primary_client_name,
target_table_name = %target_table_name,
event_type = event_type.as_deref().unwrap_or(""),
external_id = external_id.as_deref().unwrap_or(""),
"Inbound webhook sink projection client could not be resolved"
);
return bad_request(
"Inbound webhook sink projection client could not be resolved",
format!(
"sink '{}' could not resolve a client for target table '{}'",
resolved_sink_name, target_table_name
),
);
};
let target_pool = match pool_for_client(state.get_ref(), &target_client_name).await {
Ok(pool) => pool,
Err(response) => {
if acknowledge_without_persistence {
log_sink_degraded_acknowledgement(
resolved_sink_name,
&target_client_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
true,
"projection_client_unavailable",
Some(response_error_message(&response).as_str()),
);
return acknowledged_without_persistence_response(
resolved_sink_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
true,
"projection_client_unavailable",
Some(response_error_message(&response)),
);
}
log_sink_client_unavailable(
state.get_ref(),
target_client_override
.as_deref()
.or(definition.client.as_deref()),
Some(&primary_client_name),
resolved_sink_name,
&target_client_name,
Some(&target_table_name),
event_type.as_deref(),
external_id.as_deref(),
verified,
&response,
"projection_client_unavailable",
)
.await;
return response;
}
};
let inserted_target_row = match insert_row_with_sink_schema_heal(
resolved_sink_name,
&target_client_name,
&target_pool,
&target_table_name,
&projected_row,
)
.await
{
Ok(row) => row,
Err(error) => {
if acknowledge_without_persistence {
log_sink_degraded_acknowledgement(
resolved_sink_name,
&target_client_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
true,
"projection_insert_failed",
Some(error.to_string().as_str()),
);
return acknowledged_without_persistence_response(
resolved_sink_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
true,
"projection_insert_failed",
Some(error.to_string()),
);
}
error!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
client_name = %target_client_name,
target_table_name = %target_table_name,
event_type = event_type.as_deref().unwrap_or(""),
external_id = external_id.as_deref().unwrap_or(""),
verified,
error = %error,
error_chain = %format!("{error:#}"),
"Failed to persist inbound webhook sink projection row"
);
return internal_error(
"Failed to persist inbound webhook sink projection row",
format!(
"sink '{}' target '{}' failed: {}",
resolved_sink_name, target_table_name, error
),
);
}
};
target_inserts.push(json!({
"clientName": target_client_name,
"tableName": target_table_name,
"row": inserted_target_row
}));
}
info!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
client_name = %primary_client_name,
event_type = event_type.as_deref().unwrap_or(""),
external_id = external_id.as_deref().unwrap_or(""),
verified,
control_event = is_control_event,
degraded_persistence = false,
target_insert_count = target_inserts.len(),
"Inbound webhook sink event stored"
);
api_success(
"Stored inbound webhook sink event",
json!({
"sink": summary_from_inbound_sink(resolved_sink_name, definition),
"verified": verified,
"eventType": event_type,
"externalId": external_id,
"row": inserted_row,
"targetInserts": target_inserts
}),
)
}
fn is_mollie_hook_ping_event(sink_name: &str, event_type: Option<&str>) -> bool {
sink_name.eq_ignore_ascii_case(MOLLIE_SINK_NAME) && event_type == Some("hook.ping")
}
fn acknowledged_without_persistence_response(
sink_name: &str,
event_type: Option<&str>,
external_id: Option<&str>,
verified: bool,
control_event: bool,
persisted_primary: bool,
reason: &str,
detail: Option<String>,
) -> HttpResponse {
api_success(
if control_event {
"Inbound webhook control event acknowledged"
} else {
"Inbound webhook sink acknowledged without persistence"
},
json!({
"sinkName": sink_name,
"eventType": event_type,
"externalId": external_id,
"verified": verified,
"handled": false,
"controlEvent": control_event,
"degradedPersistence": true,
"persistedPrimary": persisted_primary,
"reason": reason,
"detail": detail,
}),
)
}
fn log_sink_degraded_acknowledgement(
sink_name: &str,
client_name: &str,
event_type: Option<&str>,
external_id: Option<&str>,
verified: bool,
control_event: bool,
persisted_primary: bool,
reason: &str,
detail: Option<&str>,
) {
warn!(
target: "athena::webhook_sinks",
sink_name = %sink_name,
client_name = %client_name,
event_type = event_type.unwrap_or(""),
external_id = external_id.unwrap_or(""),
verified,
control_event,
persisted_primary,
reason = %reason,
detail = detail.unwrap_or(""),
"Inbound webhook sink acknowledged without persistence"
);
}
fn response_error_message(response: &HttpResponse) -> String {
format!("{} {}", response.status().as_u16(), response.status())
}
async fn log_sink_client_unavailable(
state: &AppState,
configured_client: Option<&str>,
request_client_header: Option<&str>,
sink_name: &str,
resolved_client_name: &str,
target_table_name: Option<&str>,
event_type: Option<&str>,
external_id: Option<&str>,
verified: bool,
response: &HttpResponse,
reason: &str,
) {
let registered_client: Option<RegisteredClient> =
state.pg_registry.registered_client(resolved_client_name);
let live_pool_present = state.pg_registry.get_pool(resolved_client_name).is_some();
let logging_pool_present = state
.logging_client_name
.as_deref()
.is_some_and(|client_name| state.pg_registry.get_pool(client_name).is_some());
let (catalog_has_database_connection, catalog_lookup_error): (Option<bool>, Option<String>) =
if state.gateway_database_backed_client_loading_enabled {
match catalog_client_has_database_connection(state, resolved_client_name).await {
Ok(value) => (Some(value), None),
Err(error) => (None, Some(error)),
}
} else {
(None, None)
};
error!(
target: "athena::webhook_sinks",
sink_name = %sink_name,
configured_client = configured_client.unwrap_or(""),
request_client_header = request_client_header.unwrap_or(""),
resolved_client_name = %resolved_client_name,
target_table_name = target_table_name.unwrap_or(""),
event_type = event_type.unwrap_or(""),
external_id = external_id.unwrap_or(""),
verified,
reason = %reason,
status = %response.status(),
response_detail = %response_error_message(response),
registry_has_live_pool = live_pool_present,
registry_has_registered_client = registered_client.is_some(),
registry_registered_client_name =
registered_client.as_ref().map(|client| client.client_name.as_str()).unwrap_or(""),
registry_registered_source =
registered_client.as_ref().map(|client| client.source.as_str()).unwrap_or(""),
registry_registered_active = ?registered_client.as_ref().map(|client| client.is_active),
registry_registered_frozen = ?registered_client.as_ref().map(|client| client.is_frozen),
registry_registered_pool_connected =
?registered_client.as_ref().map(|client| client.pool_connected),
gateway_database_backed_client_loading_enabled =
state.gateway_database_backed_client_loading_enabled,
logging_client_name = state.logging_client_name.as_deref().unwrap_or(""),
logging_pool_present,
catalog_has_database_connection = ?catalog_has_database_connection,
catalog_lookup_error = catalog_lookup_error.as_deref().unwrap_or(""),
"Inbound webhook sink client is unavailable"
);
}
fn find_sink_definition<'a>(
registry: &'a HashMap<String, InboundWebhookSinkDefinition>,
sink_name: &str,
) -> Option<(&'a str, &'a InboundWebhookSinkDefinition)> {
registry
.iter()
.find(|(name, _)| name.eq_ignore_ascii_case(sink_name.trim()))
.map(|(name, definition)| (name.as_str(), definition))
}
fn expected_shared_secret(
definition: &InboundWebhookSinkDefinition,
) -> Result<Option<String>, HttpResponse> {
if definition.disable_shared_secret_verification {
return Ok(None);
}
let Some(env_var_name) = definition
.shared_secret_env_var
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return Ok(None);
};
match env::var(env_var_name) {
Ok(value) if !value.trim().is_empty() => Ok(Some(value)),
_ => Err(internal_error(
"Inbound webhook sink secret is not configured",
format!("Environment variable '{env_var_name}' is not set or is blank"),
)),
}
}
fn enforce_sink_athena_base_url(
definition: &InboundWebhookSinkDefinition,
req: &HttpRequest,
sink_name: &str,
) -> Result<(), HttpResponse> {
let Some(expected_base_url) = definition
.athena_base_url
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return Ok(());
};
let Some(request_base_url) = computed_request_base_url(req) else {
warn!(
target: "athena::webhook_sinks",
sink_name = %sink_name,
expected_base_url = %expected_base_url,
host = header_value_trimmed(req, "host").unwrap_or_default(),
forwarded_host = header_value_trimmed(req, "x-forwarded-host").unwrap_or_default(),
forwarded_proto = header_value_trimmed(req, "x-forwarded-proto").unwrap_or_default(),
"Inbound webhook mirror URL rejected because the request base URL could not be resolved"
);
return Err(forbidden_with_code(
"Inbound webhook mirror URL rejected",
format!(
"Sink '{}' is pinned to Athena base URL '{}' but the request base URL could not be resolved",
sink_name, expected_base_url
),
"WEBHOOK_SINK_ATHENA_BASE_URL_MISMATCH",
));
};
if same_normalized_base_url(expected_base_url, &request_base_url) {
return Ok(());
}
warn!(
target: "athena::webhook_sinks",
sink_name = %sink_name,
expected_base_url = %expected_base_url,
request_base_url = %request_base_url,
host = header_value_trimmed(req, "host").unwrap_or_default(),
forwarded_host = header_value_trimmed(req, "x-forwarded-host").unwrap_or_default(),
forwarded_proto = header_value_trimmed(req, "x-forwarded-proto").unwrap_or_default(),
"Inbound webhook mirror URL rejected because the request reached the wrong Athena base URL"
);
Err(forbidden_with_code(
"Inbound webhook mirror URL rejected",
format!(
"Sink '{}' is pinned to Athena base URL '{}' but this request resolved to '{}'",
sink_name, expected_base_url, request_base_url
),
"WEBHOOK_SINK_ATHENA_BASE_URL_MISMATCH",
))
}
fn computed_request_base_url(req: &HttpRequest) -> Option<String> {
let scheme = header_value_trimmed(req, "x-forwarded-proto")
.or_else(|| forwarded_header_part(req, "proto"))
.unwrap_or_else(|| req.connection_info().scheme().trim().to_string());
let host = header_value_trimmed(req, "x-forwarded-host")
.or_else(|| forwarded_header_part(req, "host"))
.or_else(|| header_value_trimmed(req, "host"))
.or_else(|| {
let value = req.connection_info().host().trim().to_string();
if value.is_empty() { None } else { Some(value) }
})?;
let prefix = header_value_trimmed(req, "x-forwarded-prefix")
.unwrap_or_default()
.trim_end_matches('/')
.to_string();
let suffix = if prefix.is_empty() {
String::new()
} else if prefix.starts_with('/') {
prefix
} else {
format!("/{prefix}")
};
Some(format!("{scheme}://{host}{suffix}"))
}
fn same_normalized_base_url(expected: &str, actual: &str) -> bool {
match (normalize_base_url(expected), normalize_base_url(actual)) {
(Some(left), Some(right)) => left == right,
_ => false,
}
}
fn normalize_base_url(value: &str) -> Option<String> {
let uri = value
.trim()
.trim_end_matches('/')
.parse::<actix_web::http::Uri>()
.ok()?;
let scheme = uri.scheme_str()?.trim().to_ascii_lowercase();
let authority = uri.authority()?;
let host = authority
.host()
.trim()
.trim_end_matches('.')
.to_ascii_lowercase();
if host.is_empty() {
return None;
}
let port = authority
.port_u16()
.filter(|port| !is_default_port(&scheme, *port));
let path = uri.path().trim_end_matches('/');
let normalized_path = if path.is_empty() || path == "/" {
""
} else {
path
};
Some(match port {
Some(port) => format!("{scheme}://{host}:{port}{normalized_path}"),
None => format!("{scheme}://{host}{normalized_path}"),
})
}
fn is_default_port(scheme: &str, port: u16) -> bool {
matches!((scheme, port), ("https", 443) | ("http", 80))
}
fn header_value_trimmed(req: &HttpRequest, name: &str) -> Option<String> {
req.headers()
.get(name)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn forwarded_header_part(req: &HttpRequest, key_name: &str) -> Option<String> {
let forwarded = header_value_trimmed(req, "forwarded")?;
forwarded
.split(',')
.flat_map(|entry| entry.split(';'))
.filter_map(|part| part.trim().split_once('='))
.find_map(|(key, value)| {
if !key.trim().eq_ignore_ascii_case(key_name) {
return None;
}
let value = value.trim().trim_matches('"').trim();
if value.is_empty() {
None
} else {
Some(value.to_string())
}
})
}
fn sink_metadata(definition: &InboundWebhookSinkDefinition) -> Result<Value, HttpResponse> {
match definition.metadata.clone().unwrap_or_else(|| json!({})) {
Value::Object(object) => Ok(Value::Object(object)),
_ => Err(internal_error(
"Inbound webhook sink metadata config is invalid",
"Sink metadata must be a JSON object",
)),
}
}
fn map_to_json_object(source: &HashMap<String, String>) -> Map<String, Value> {
let mut object = Map::new();
let mut keys = source.keys().collect::<Vec<_>>();
keys.sort();
for key in keys {
if let Some(value) = source.get(key) {
object.insert(key.clone(), Value::String(value.clone()));
}
}
object
}
fn headers_to_json_object(headers: &actix_web::http::header::HeaderMap) -> Map<String, Value> {
let mut object = Map::new();
for (name, value) in headers {
if let Ok(text) = value.to_str() {
object.insert(
name.as_str().to_ascii_lowercase(),
Value::String(text.to_string()),
);
}
}
object
}
#[cfg(test)]
mod tests {
use super::{list_webhook_sink_templates, post_mollie_alias, post_webhook_alias};
use crate::AppState;
use actix_web::http::StatusCode;
use actix_web::http::header::{HOST, HeaderName, HeaderValue};
use actix_web::{App, body::to_bytes, test, web};
use athena_webhooks::InboundWebhookSinkDefinition;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
fn state_with_registry() -> web::Data<AppState> {
state_with_registry_definition(InboundWebhookSinkDefinition {
client: Some("suits-formations".to_string()),
schema_name: Some("athena".to_string()),
table_name: "webhook_sink_events".to_string(),
event_type_json_pointer: Some("/type".to_string()),
external_id_json_pointer: Some("/entityId".to_string()),
..Default::default()
})
}
fn state_with_registry_definition(
definition: InboundWebhookSinkDefinition,
) -> web::Data<AppState> {
let mut state = AppState::default();
let mut registry = HashMap::new();
registry.insert("mollie".to_string(), definition);
state.webhook_sink_registry = Some(Arc::new(registry));
web::Data::new(state)
}
#[actix_web::test]
async fn templates_route_lists_loaded_registry_entries() {
let app = test::init_service(
App::new()
.app_data(state_with_registry())
.service(list_webhook_sink_templates),
)
.await;
let request = test::TestRequest::get()
.uri("/webhook-sinks/templates")
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert_eq!(json["data"]["templates"][0]["name"], "mollie");
assert_eq!(
json["data"]["templates"][0]["qualified_table_name"],
"athena.webhook_sink_events"
);
}
#[actix_web::test]
async fn webhooks_alias_routes_into_named_sink() {
let app = test::init_service(
App::new()
.app_data(state_with_registry())
.service(post_webhook_alias),
)
.await;
let request = test::TestRequest::post()
.uri("/webhooks/mollie")
.set_payload(r#"{"type":"payment-link.paid","entityId":"pl_123"}"#)
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert!(
json["error"]
.as_str()
.unwrap_or_default()
.contains("suits-formations")
);
}
#[actix_web::test]
async fn mollie_shortcut_alias_uses_mollie_sink() {
let app = test::init_service(
App::new()
.app_data(state_with_registry())
.service(post_mollie_alias),
)
.await;
let request = test::TestRequest::post()
.uri("/mollie")
.set_payload(r#"{"type":"payout.initiated","entityId":"payout_123"}"#)
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert!(
json["error"]
.as_str()
.unwrap_or_default()
.contains("suits-formations")
);
}
#[actix_web::test]
async fn sink_can_disable_shared_secret_verification() {
let app = test::init_service(
App::new()
.app_data(state_with_registry_definition(
InboundWebhookSinkDefinition {
client: Some("suits-formations".to_string()),
schema_name: Some("athena".to_string()),
table_name: "webhook_sink_events".to_string(),
shared_secret_env_var: Some("MOLLIE_WEBHOOK_SINK_SECRET".to_string()),
disable_shared_secret_verification: true,
event_type_json_pointer: Some("/type".to_string()),
external_id_json_pointer: Some("/entityId".to_string()),
..Default::default()
},
))
.service(post_webhook_alias),
)
.await;
let request = test::TestRequest::post()
.uri("/webhooks/mollie")
.set_payload(r#"{"type":"hook.ping","entityId":"hook_123"}"#)
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert!(
json["error"]
.as_str()
.unwrap_or_default()
.contains("suits-formations")
);
}
#[actix_web::test]
async fn sink_can_acknowledge_without_persistence_when_configured() {
let app = test::init_service(
App::new()
.app_data(state_with_registry_definition(
InboundWebhookSinkDefinition {
client: Some("suits-formations".to_string()),
schema_name: Some("athena".to_string()),
table_name: "webhook_sink_events".to_string(),
disable_shared_secret_verification: true,
acknowledge_without_persistence: true,
event_type_json_pointer: Some("/type".to_string()),
external_id_json_pointer: Some("/entityId".to_string()),
..Default::default()
},
))
.service(post_webhook_alias),
)
.await;
let request = test::TestRequest::post()
.uri("/webhooks/mollie")
.set_payload(r#"{"type":"payout.processing-at-bank","entityId":"payout_123"}"#)
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert_eq!(json["data"]["handled"], Value::Bool(false));
assert_eq!(json["data"]["controlEvent"], Value::Bool(false));
assert_eq!(json["data"]["degradedPersistence"], Value::Bool(true));
assert_eq!(
json["data"]["reason"],
Value::String("primary_client_unavailable".to_string())
);
}
#[actix_web::test]
async fn sink_rejects_requests_from_other_athena_mirrors() {
let app = test::init_service(
App::new()
.app_data(state_with_registry_definition(
InboundWebhookSinkDefinition {
client: Some("suits-formations".to_string()),
athena_base_url: Some("https://mirror2.athena-cluster.com".to_string()),
schema_name: Some("athena".to_string()),
table_name: "webhook_sink_events".to_string(),
disable_shared_secret_verification: true,
event_type_json_pointer: Some("/type".to_string()),
external_id_json_pointer: Some("/entityId".to_string()),
..Default::default()
},
))
.service(post_webhook_alias),
)
.await;
let request = test::TestRequest::post()
.uri("/webhooks/mollie")
.insert_header((HOST, HeaderValue::from_static("mirror3.athena-cluster.com")))
.insert_header((
HeaderName::from_static("x-forwarded-proto"),
HeaderValue::from_static("https"),
))
.set_payload(r#"{"type":"hook.ping","entityId":"hook_123"}"#)
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::FORBIDDEN);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert_eq!(json["code"], "WEBHOOK_SINK_ATHENA_BASE_URL_MISMATCH");
}
#[actix_web::test]
async fn sink_accepts_forwarded_matching_athena_base_url() {
let app = test::init_service(
App::new()
.app_data(state_with_registry_definition(
InboundWebhookSinkDefinition {
client: Some("suits-formations".to_string()),
athena_base_url: Some("https://mirror2.athena-cluster.com".to_string()),
schema_name: Some("athena".to_string()),
table_name: "webhook_sink_events".to_string(),
disable_shared_secret_verification: true,
event_type_json_pointer: Some("/type".to_string()),
external_id_json_pointer: Some("/entityId".to_string()),
..Default::default()
},
))
.service(post_webhook_alias),
)
.await;
let request = test::TestRequest::post()
.uri("/webhooks/mollie")
.insert_header((HOST, HeaderValue::from_static("internal-athena:5052")))
.insert_header((
HeaderName::from_static("x-forwarded-host"),
HeaderValue::from_static("mirror2.athena-cluster.com"),
))
.insert_header((
HeaderName::from_static("x-forwarded-proto"),
HeaderValue::from_static("https"),
))
.set_payload(r#"{"type":"hook.ping","entityId":"hook_123"}"#)
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert!(
json["error"]
.as_str()
.unwrap_or_default()
.contains("suits-formations")
);
}
}