use actix_web::{
HttpRequest, HttpResponse, Responder, get, post,
web::{self, Bytes, Data},
};
use athena_billing::providers::billing_signature_header_name;
use serde::Serialize;
use serde_json::{Map, Value, json};
use sha256::digest;
use std::collections::HashMap;
use std::env;
use tracing::{error, info, warn};
use athena_webhooks::{
InboundWebhookBillingEnrichmentDefinition, InboundWebhookSinkDefinition,
InboundWebhookSinkPayload, InboundWebhookSinkRuntimeContext,
WEBHOOK_ERROR_CODE_WEBHOOK_SINK_NOT_FOUND, build_default_inbound_sink_row,
build_projected_inbound_sink_rows, default_secret_header_name, default_secret_query_param,
json_pointer_string, qualify_webhook_sink_table_name, resolve_inbound_selector_string,
resolve_inbound_sink_client_name, summary_from_inbound_sink,
};
use crate::AppState;
use crate::api::client_context::pool_for_client;
use crate::api::response::{
api_success, bad_gateway_with_code, bad_request, bad_request_with_code, forbidden_with_code,
internal_error, internal_error_with_code, not_found_with_code, processed_error,
unauthorized_with_code,
};
use crate::api::webhook_sink_schema_heal::insert_row_with_sink_schema_heal;
use crate::athena::postgres_clients::catalog_client_has_database_connection;
use crate::data::billing::{
BillingStoreError, get_billing_provider_connection,
ingest_and_upsert_billing_webhook_for_connection, resolve_active_billing_provider_connection,
};
use crate::drivers::postgresql::sqlx_driver::RegisteredClient;
const MOLLIE_SINK_NAME: &str = "mollie";
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct SinkBillingEnrichmentStatus {
provider: String,
client_name: String,
connection_id: Option<uuid::Uuid>,
handled: bool,
control_event: bool,
verification_mode: Option<String>,
reason: String,
event_log_id: Option<uuid::Uuid>,
document_kind: Option<String>,
provider_ref: Option<String>,
table: Option<String>,
}
#[get("/webhook-sinks/templates")]
async fn list_webhook_sink_templates(state: Data<AppState>) -> impl Responder {
let mut templates = state
.webhook_sink_registry
.as_ref()
.map(|registry| {
registry
.iter()
.map(|(name, definition)| summary_from_inbound_sink(name, definition))
.collect::<Vec<_>>()
})
.unwrap_or_default();
templates.sort_by(|left, right| left.name.cmp(&right.name));
api_success(
"Listed inbound webhook sink templates",
json!({ "templates": templates }),
)
}
#[post("/webhook-sinks/{sink_name}")]
async fn post_webhook_sink(
sink_name: web::Path<String>,
req: HttpRequest,
body: Bytes,
state: Data<AppState>,
) -> impl Responder {
ingest_sink_request(sink_name.as_str(), req, body, state).await
}
#[post("/webhooks/{sink_name}")]
async fn post_webhook_alias(
sink_name: web::Path<String>,
req: HttpRequest,
body: Bytes,
state: Data<AppState>,
) -> impl Responder {
ingest_sink_request(sink_name.as_str(), req, body, state).await
}
#[post("/mollie")]
async fn post_mollie_alias(req: HttpRequest, body: Bytes, state: Data<AppState>) -> impl Responder {
ingest_sink_request(MOLLIE_SINK_NAME, req, body, state).await
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(list_webhook_sink_templates)
.service(post_webhook_sink)
.service(post_webhook_alias)
.service(post_mollie_alias);
}
async fn ingest_sink_request(
sink_name: &str,
req: HttpRequest,
body: Bytes,
state: Data<AppState>,
) -> HttpResponse {
let Some(registry) = state.webhook_sink_registry.as_ref() else {
warn!(
target: "athena::webhook_sinks",
sink_name = %sink_name,
"Inbound webhook sink request rejected because no registry is loaded"
);
return not_found_with_code(
"Inbound webhook sink definition was not found",
format!("No inbound webhook sink named '{sink_name}' is loaded"),
WEBHOOK_ERROR_CODE_WEBHOOK_SINK_NOT_FOUND,
);
};
let Some((resolved_sink_name, definition)) = find_sink_definition(registry, sink_name) else {
warn!(
target: "athena::webhook_sinks",
sink_name = %sink_name,
"Inbound webhook sink request rejected because the sink definition was not found"
);
return not_found_with_code(
"Inbound webhook sink definition was not found",
format!("No inbound webhook sink named '{sink_name}' is loaded"),
WEBHOOK_ERROR_CODE_WEBHOOK_SINK_NOT_FOUND,
);
};
if let Err(response) = enforce_sink_athena_base_url(definition, &req, resolved_sink_name) {
return response;
}
let query_params: HashMap<String, String> =
serde_urlencoded::from_str(req.query_string()).unwrap_or_default();
let query_params_json = map_to_json_object(&query_params);
let headers_json = headers_to_json_object(req.headers());
let secret_header = default_secret_header_name(definition);
let secret_query_param = default_secret_query_param(definition);
let provided_secret = req
.headers()
.get(secret_header.as_str())
.and_then(|value| value.to_str().ok())
.map(str::to_string)
.or_else(|| query_params.get(&secret_query_param).cloned());
let expected_secret = match expected_shared_secret(definition) {
Ok(secret) => secret,
Err(response) => return response,
};
let verified = match expected_secret.as_deref() {
Some(expected) => match provided_secret.as_deref() {
Some(provided) if provided == expected => true,
_ => {
let secret_error = if provided_secret.is_some() {
format!(
"Request secret did not match the configured secret for sink '{}'. Expected query param '{}' or header '{}'.",
resolved_sink_name, secret_query_param, secret_header
)
} else {
format!(
"Request secret was missing for sink '{}'. Provide it via query param '{}' or header '{}'.",
resolved_sink_name, secret_query_param, secret_header
)
};
warn!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
secret_header = %secret_header,
secret_query_param = %secret_query_param,
secret_present = provided_secret.is_some(),
"Inbound webhook sink secret rejected"
);
return unauthorized_with_code(
"Inbound webhook secret rejected",
secret_error,
"BILLING_WEBHOOK_SECRET_INVALID",
);
}
},
None => false,
};
let payload_text = std::str::from_utf8(&body).ok().map(str::to_string);
let payload_json = serde_json::from_slice::<Value>(&body).ok();
let event_type = json_pointer_string(
payload_json.as_ref(),
definition.event_type_json_pointer.as_deref(),
);
let external_id = json_pointer_string(
payload_json.as_ref(),
definition.external_id_json_pointer.as_deref(),
);
let metadata = match sink_metadata(definition) {
Ok(metadata) => metadata,
Err(response) => return response,
};
let client_header = req
.headers()
.get("X-Athena-Client")
.and_then(|value| value.to_str().ok());
let Some(primary_client_name) =
resolve_inbound_sink_client_name(definition.client.as_deref(), client_header)
else {
warn!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
"Inbound webhook sink request rejected because the client could not be resolved"
);
return bad_request(
"Inbound webhook sink client could not be resolved",
"Set `client` in config/webhook-sinks.yaml or provide X-Athena-Client",
);
};
let is_control_event = is_mollie_hook_ping_event(resolved_sink_name, event_type.as_deref());
let acknowledge_without_persistence = definition.acknowledge_without_persistence;
let primary_pool = match pool_for_client(state.get_ref(), &primary_client_name).await {
Ok(pool) => pool,
Err(response) => {
if acknowledge_without_persistence {
log_sink_degraded_acknowledgement(
resolved_sink_name,
&primary_client_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
false,
"primary_client_unavailable",
Some(response_error_message(&response).as_str()),
);
return acknowledged_without_persistence_response(
resolved_sink_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
false,
"primary_client_unavailable",
Some(response_error_message(&response)),
);
}
log_sink_client_unavailable(
state.get_ref(),
definition.client.as_deref(),
client_header,
resolved_sink_name,
&primary_client_name,
None,
event_type.as_deref(),
external_id.as_deref(),
verified,
&response,
"primary_client_unavailable",
)
.await;
return response;
}
};
let qualified_table_name = match qualify_webhook_sink_table_name(
definition.schema_name.as_deref(),
&definition.table_name,
) {
Ok(table_name) => table_name,
Err(error) => {
return internal_error(
"Inbound webhook sink table config is invalid",
format!("sink '{}' is invalid: {error}", resolved_sink_name),
);
}
};
let received_at = chrono::Utc::now();
let request_path = req.path().to_string();
let http_method = req.method().as_str().to_string();
let source_ip = req
.connection_info()
.realip_remote_addr()
.map(str::to_string);
let content_type = req
.headers()
.get("content-type")
.and_then(|value| value.to_str().ok())
.map(str::to_string);
let query_params_value = Value::Object(query_params_json.clone());
let headers_value = Value::Object(headers_json.clone());
let raw_row = build_default_inbound_sink_row(InboundWebhookSinkPayload {
sink_name: resolved_sink_name,
received_at,
request_path: request_path.as_str(),
http_method: http_method.as_str(),
source_ip: source_ip.as_deref(),
content_type: content_type.as_deref(),
query_params_json: query_params_value.clone(),
headers_json: headers_value.clone(),
payload_json: payload_json.clone(),
payload_text: payload_text.as_deref(),
verified,
event_type: event_type.clone(),
external_id: external_id.clone(),
metadata: metadata.clone(),
});
let inserted_row = match insert_row_with_sink_schema_heal(
resolved_sink_name,
&primary_client_name,
&primary_pool,
&qualified_table_name,
&raw_row,
)
.await
{
Ok(row) => row,
Err(error) => {
if acknowledge_without_persistence {
log_sink_degraded_acknowledgement(
resolved_sink_name,
&primary_client_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
false,
"primary_insert_failed",
Some(error.to_string().as_str()),
);
return acknowledged_without_persistence_response(
resolved_sink_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
false,
"primary_insert_failed",
Some(error.to_string()),
);
}
error!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
client_name = %primary_client_name,
event_type = event_type.as_deref().unwrap_or(""),
external_id = external_id.as_deref().unwrap_or(""),
verified,
table_name = %qualified_table_name,
error = %error,
error_chain = %format!("{error:#}"),
"Failed to persist inbound webhook sink event"
);
return internal_error(
"Failed to persist inbound webhook sink event",
error.to_string(),
);
}
};
let payload_sha256 = digest(payload_text.as_deref().unwrap_or_default());
let runtime_context = InboundWebhookSinkRuntimeContext {
sink_name: resolved_sink_name,
received_at,
request_path: request_path.as_str(),
http_method: http_method.as_str(),
source_ip: source_ip.as_deref(),
content_type: content_type.as_deref(),
verified,
event_type: event_type.as_deref(),
external_id: external_id.as_deref(),
query_params_json: &query_params_value,
headers_json: &headers_value,
payload_json: payload_json.as_ref(),
payload_text: payload_text.as_deref(),
payload_sha256: &payload_sha256,
metadata: &metadata,
};
let projected_rows = match build_projected_inbound_sink_rows(definition, &runtime_context) {
Ok(rows) => rows,
Err(error) => {
if acknowledge_without_persistence {
log_sink_degraded_acknowledgement(
resolved_sink_name,
&primary_client_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
true,
"projection_build_failed",
Some(error.as_str()),
);
return acknowledged_without_persistence_response(
resolved_sink_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
true,
"projection_build_failed",
Some(error),
);
}
error!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
client_name = %primary_client_name,
event_type = event_type.as_deref().unwrap_or(""),
external_id = external_id.as_deref().unwrap_or(""),
verified,
error = %error,
"Failed to build inbound webhook sink projection rows"
);
return internal_error(
"Failed to build inbound webhook sink projection rows",
error,
);
}
};
let mut target_inserts = Vec::with_capacity(projected_rows.len());
for (target_client_override, target_table_name, projected_row) in projected_rows {
let Some(target_client_name) = resolve_inbound_sink_client_name(
target_client_override.as_deref(),
Some(&primary_client_name),
) else {
error!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
client_name = %primary_client_name,
target_table_name = %target_table_name,
event_type = event_type.as_deref().unwrap_or(""),
external_id = external_id.as_deref().unwrap_or(""),
"Inbound webhook sink projection client could not be resolved"
);
return bad_request(
"Inbound webhook sink projection client could not be resolved",
format!(
"sink '{}' could not resolve a client for target table '{}'",
resolved_sink_name, target_table_name
),
);
};
let target_pool = match pool_for_client(state.get_ref(), &target_client_name).await {
Ok(pool) => pool,
Err(response) => {
if acknowledge_without_persistence {
log_sink_degraded_acknowledgement(
resolved_sink_name,
&target_client_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
true,
"projection_client_unavailable",
Some(response_error_message(&response).as_str()),
);
return acknowledged_without_persistence_response(
resolved_sink_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
true,
"projection_client_unavailable",
Some(response_error_message(&response)),
);
}
log_sink_client_unavailable(
state.get_ref(),
target_client_override
.as_deref()
.or(definition.client.as_deref()),
Some(&primary_client_name),
resolved_sink_name,
&target_client_name,
Some(&target_table_name),
event_type.as_deref(),
external_id.as_deref(),
verified,
&response,
"projection_client_unavailable",
)
.await;
return response;
}
};
let inserted_target_row = match insert_row_with_sink_schema_heal(
resolved_sink_name,
&target_client_name,
&target_pool,
&target_table_name,
&projected_row,
)
.await
{
Ok(row) => row,
Err(error) => {
if acknowledge_without_persistence {
log_sink_degraded_acknowledgement(
resolved_sink_name,
&target_client_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
true,
"projection_insert_failed",
Some(error.to_string().as_str()),
);
return acknowledged_without_persistence_response(
resolved_sink_name,
event_type.as_deref(),
external_id.as_deref(),
verified,
is_control_event,
true,
"projection_insert_failed",
Some(error.to_string()),
);
}
error!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
client_name = %target_client_name,
target_table_name = %target_table_name,
event_type = event_type.as_deref().unwrap_or(""),
external_id = external_id.as_deref().unwrap_or(""),
verified,
error = %error,
error_chain = %format!("{error:#}"),
"Failed to persist inbound webhook sink projection row"
);
return internal_error(
"Failed to persist inbound webhook sink projection row",
format!(
"sink '{}' target '{}' failed: {}",
resolved_sink_name, target_table_name, error
),
);
}
};
target_inserts.push(json!({
"clientName": target_client_name,
"tableName": target_table_name,
"row": inserted_target_row
}));
}
let billing_enrichment = match persist_sink_billing_enrichment(
state.get_ref(),
definition,
&runtime_context,
&req,
body.as_ref(),
&primary_client_name,
&primary_pool,
)
.await
{
Ok(result) => result,
Err(response) => return response,
};
info!(
target: "athena::webhook_sinks",
sink_name = %resolved_sink_name,
client_name = %primary_client_name,
event_type = event_type.as_deref().unwrap_or(""),
external_id = external_id.as_deref().unwrap_or(""),
verified,
control_event = is_control_event,
degraded_persistence = false,
target_insert_count = target_inserts.len(),
billing_enrichment_handled = billing_enrichment.as_ref().map(|value| value.handled),
billing_enrichment_reason =
billing_enrichment.as_ref().map(|value| value.reason.as_str()).unwrap_or(""),
"Inbound webhook sink event stored"
);
api_success(
"Stored inbound webhook sink event",
json!({
"sink": summary_from_inbound_sink(resolved_sink_name, definition),
"verified": verified,
"eventType": event_type,
"externalId": external_id,
"row": inserted_row,
"targetInserts": target_inserts,
"billingEnrichment": billing_enrichment
}),
)
}
async fn persist_sink_billing_enrichment(
state: &AppState,
definition: &InboundWebhookSinkDefinition,
runtime_context: &InboundWebhookSinkRuntimeContext<'_>,
req: &HttpRequest,
body: &[u8],
primary_client_name: &str,
primary_pool: &sqlx::PgPool,
) -> Result<Option<SinkBillingEnrichmentStatus>, HttpResponse> {
let Some(enrichment) = definition.billing_enrichment.as_ref() else {
return Ok(None);
};
let provider = enrichment.provider.trim().to_ascii_lowercase();
if provider.is_empty() {
return Err(internal_error(
"Inbound webhook sink billing enrichment config is invalid",
"billing_enrichment.provider must not be blank",
));
}
let (client_name, pool) =
resolve_sink_billing_enrichment_pool(state, enrichment, primary_client_name, primary_pool)
.await?;
let signature_headers = billing_signature_headers(req, &provider);
let control_event = provider == "mollie" && runtime_context.event_type == Some("hook.ping");
if control_event && signature_headers.is_empty() {
let status = SinkBillingEnrichmentStatus {
provider: provider.clone(),
client_name: client_name.clone(),
connection_id: None,
handled: false,
control_event: true,
verification_mode: Some("unsigned_control_event".to_string()),
reason: "unsigned_hook_ping_ignored".to_string(),
event_log_id: None,
document_kind: None,
provider_ref: None,
table: None,
};
info!(
target: "athena::webhook_sinks",
sink_name = %runtime_context.sink_name,
client_name = %client_name,
provider = %provider,
event_type = runtime_context.event_type.unwrap_or(""),
external_id = runtime_context.external_id.unwrap_or(""),
reason = %status.reason,
"Inbound webhook sink skipped canonical billing enrichment"
);
return Ok(Some(status));
}
let connection =
resolve_sink_billing_enrichment_connection(&pool, enrichment, runtime_context).await?;
let persistence = ingest_and_upsert_billing_webhook_for_connection(
&pool,
connection.id,
body,
&signature_headers,
)
.await
.map_err(|error| {
error!(
target: "athena::webhook_sinks",
sink_name = %runtime_context.sink_name,
client_name = %client_name,
provider = %provider,
connection_id = %connection.id,
event_type = runtime_context.event_type.unwrap_or(""),
external_id = runtime_context.external_id.unwrap_or(""),
verified = runtime_context.verified,
error = %error,
error_chain = %format!("{error:#}"),
"Inbound webhook sink billing enrichment failed"
);
sink_billing_store_error_response(error)
})?;
let status = match persistence {
Some(persistence) => SinkBillingEnrichmentStatus {
provider: provider.clone(),
client_name: client_name.clone(),
connection_id: Some(persistence.connection.id),
handled: true,
control_event: false,
verification_mode: Some(persistence.webhook_event.verification_mode.clone()),
reason: "upserted".to_string(),
event_log_id: Some(persistence.event_log_id),
document_kind: Some(
match &persistence.document {
athena_billing::canonical::CanonicalBillingDocument::Payment(_) => "payment",
athena_billing::canonical::CanonicalBillingDocument::Subscription(_) => {
"subscription"
}
athena_billing::canonical::CanonicalBillingDocument::Invoice(_) => "invoice",
}
.to_string(),
),
provider_ref: Some(persistence.upsert.provider_ref.clone()),
table: Some(persistence.upsert.table.clone()),
},
None => SinkBillingEnrichmentStatus {
provider: provider.clone(),
client_name: client_name.clone(),
connection_id: Some(connection.id),
handled: false,
control_event,
verification_mode: None,
reason: "no_canonical_document".to_string(),
event_log_id: None,
document_kind: None,
provider_ref: None,
table: None,
},
};
info!(
target: "athena::webhook_sinks",
sink_name = %runtime_context.sink_name,
client_name = %client_name,
provider = %provider,
connection_id = %connection.id,
event_type = runtime_context.event_type.unwrap_or(""),
external_id = runtime_context.external_id.unwrap_or(""),
handled = status.handled,
reason = %status.reason,
document_kind = status.document_kind.as_deref().unwrap_or(""),
provider_ref = status.provider_ref.as_deref().unwrap_or(""),
table = status.table.as_deref().unwrap_or(""),
"Inbound webhook sink canonical billing enrichment finished"
);
Ok(Some(status))
}
async fn resolve_sink_billing_enrichment_pool(
state: &AppState,
enrichment: &InboundWebhookBillingEnrichmentDefinition,
primary_client_name: &str,
primary_pool: &sqlx::PgPool,
) -> Result<(String, sqlx::PgPool), HttpResponse> {
let Some(client_name) =
resolve_inbound_sink_client_name(enrichment.client.as_deref(), Some(primary_client_name))
else {
return Err(internal_error(
"Inbound webhook sink billing enrichment config is invalid",
"billing_enrichment client could not be resolved",
));
};
if client_name == primary_client_name {
return Ok((client_name, primary_pool.clone()));
}
let pool = pool_for_client(state, &client_name).await?;
Ok((client_name, pool))
}
async fn resolve_sink_billing_enrichment_connection(
pool: &sqlx::PgPool,
enrichment: &InboundWebhookBillingEnrichmentDefinition,
runtime_context: &InboundWebhookSinkRuntimeContext<'_>,
) -> Result<crate::data::billing::BillingProviderConnectionRecord, HttpResponse> {
if let Some(connection_id_selector) = enrichment.connection_id.as_ref() {
let Some(connection_id) =
resolve_inbound_selector_string(connection_id_selector, runtime_context).map_err(
|error| {
internal_error(
"Inbound webhook sink billing enrichment config is invalid",
error,
)
},
)?
else {
return Err(internal_error(
"Inbound webhook sink billing enrichment config is invalid",
"billing_enrichment.connection_id selector resolved to an empty value",
));
};
let connection_id = connection_id.parse::<uuid::Uuid>().map_err(|_| {
internal_error(
"Inbound webhook sink billing enrichment config is invalid",
format!(
"billing_enrichment.connection_id selector resolved to invalid UUID '{}'",
connection_id
),
)
})?;
let connection = get_billing_provider_connection(pool, connection_id)
.await
.map_err(sink_billing_store_error_response)?;
if !connection
.provider
.eq_ignore_ascii_case(enrichment.provider.as_str())
{
return Err(internal_error_with_code(
"Inbound webhook sink billing enrichment config is invalid",
format!(
"Billing connection '{}' is configured for provider '{}', not '{}'.",
connection.id, connection.provider, enrichment.provider
),
"BILLING_WEBHOOK_SINK_PROVIDER_MISMATCH",
));
}
return Ok(connection);
}
let mode = enrichment
.mode
.as_ref()
.map(|selector| resolve_inbound_selector_string(selector, runtime_context))
.transpose()
.map_err(|error| {
internal_error(
"Inbound webhook sink billing enrichment config is invalid",
error,
)
})?
.flatten();
let provider_profile_id = enrichment
.provider_profile_id
.as_ref()
.map(|selector| resolve_inbound_selector_string(selector, runtime_context))
.transpose()
.map_err(|error| {
internal_error(
"Inbound webhook sink billing enrichment config is invalid",
error,
)
})?
.flatten();
match resolve_active_billing_provider_connection(
pool,
&enrichment.provider,
mode.as_deref(),
provider_profile_id.as_deref(),
)
.await
.map_err(sink_billing_store_error_response)?
{
Some(connection) => Ok(connection),
None => Err(internal_error_with_code(
"Inbound webhook sink billing enrichment connection was not found",
format!(
"No active billing connection matched provider '{}'{}{}.",
enrichment.provider,
mode.as_deref()
.map(|value| format!(", mode '{}'", value))
.unwrap_or_default(),
provider_profile_id
.as_deref()
.map(|value| format!(", provider_profile_id '{}'", value))
.unwrap_or_default()
),
"BILLING_WEBHOOK_SINK_CONNECTION_NOT_FOUND",
)),
}
}
fn billing_signature_headers(req: &HttpRequest, provider: &str) -> Vec<String> {
let Some(target_header) = billing_signature_header_name(provider) else {
return Vec::new();
};
req.headers()
.iter()
.filter(|(name, _)| name.as_str().eq_ignore_ascii_case(target_header))
.filter_map(|(_, value)| value.to_str().ok().map(ToString::to_string))
.collect()
}
fn sink_billing_store_error_response(error: BillingStoreError) -> HttpResponse {
match error {
BillingStoreError::NotFound => internal_error_with_code(
"Inbound webhook sink billing enrichment failed",
"Billing provider connection was not found.",
"BILLING_CONNECTION_NOT_FOUND",
),
BillingStoreError::UnsupportedProvider(provider) => bad_request_with_code(
"Inbound webhook sink billing enrichment failed",
format!("Billing provider '{provider}' is not supported."),
"BILLING_PROVIDER_UNSUPPORTED",
),
BillingStoreError::InvalidConfig(details) => internal_error_with_code(
"Inbound webhook sink billing enrichment failed",
details,
"BILLING_CONNECTION_CONFIG_INVALID",
),
BillingStoreError::WebhookRejected { provider, details } => unauthorized_with_code(
"Inbound webhook sink billing enrichment failed",
format!(
"Billing webhook was rejected for provider '{}': {}",
provider, details
),
if provider.eq_ignore_ascii_case("stripe") {
"STRIPE_WEBHOOK_REJECTED"
} else {
"MOLLIE_WEBHOOK_REJECTED"
},
),
BillingStoreError::UnsupportedWebhook(details) => bad_request_with_code(
"Inbound webhook sink billing enrichment failed",
details,
"BILLING_WEBHOOK_PAYLOAD_UNSUPPORTED",
),
BillingStoreError::Parse(details) => bad_gateway_with_code(
"Inbound webhook sink billing enrichment failed",
details,
"BILLING_PROVIDER_RESPONSE_INVALID",
),
BillingStoreError::Http(details) => bad_gateway_with_code(
"Inbound webhook sink billing enrichment failed",
details,
"BILLING_PROVIDER_HTTP_FAILED",
),
BillingStoreError::Adapter(details) => bad_gateway_with_code(
"Inbound webhook sink billing enrichment failed",
details,
"BILLING_ADAPTER_FAILED",
),
BillingStoreError::Database(error) => {
processed_error(crate::error::sqlx_parser::process_sqlx_error(&error))
}
}
}
fn is_mollie_hook_ping_event(sink_name: &str, event_type: Option<&str>) -> bool {
sink_name.eq_ignore_ascii_case(MOLLIE_SINK_NAME) && event_type == Some("hook.ping")
}
fn acknowledged_without_persistence_response(
sink_name: &str,
event_type: Option<&str>,
external_id: Option<&str>,
verified: bool,
control_event: bool,
persisted_primary: bool,
reason: &str,
detail: Option<String>,
) -> HttpResponse {
api_success(
if control_event {
"Inbound webhook control event acknowledged"
} else {
"Inbound webhook sink acknowledged without persistence"
},
json!({
"sinkName": sink_name,
"eventType": event_type,
"externalId": external_id,
"verified": verified,
"handled": false,
"controlEvent": control_event,
"degradedPersistence": true,
"persistedPrimary": persisted_primary,
"reason": reason,
"detail": detail,
}),
)
}
fn log_sink_degraded_acknowledgement(
sink_name: &str,
client_name: &str,
event_type: Option<&str>,
external_id: Option<&str>,
verified: bool,
control_event: bool,
persisted_primary: bool,
reason: &str,
detail: Option<&str>,
) {
warn!(
target: "athena::webhook_sinks",
sink_name = %sink_name,
client_name = %client_name,
event_type = event_type.unwrap_or(""),
external_id = external_id.unwrap_or(""),
verified,
control_event,
persisted_primary,
reason = %reason,
detail = detail.unwrap_or(""),
"Inbound webhook sink acknowledged without persistence"
);
}
fn response_error_message(response: &HttpResponse) -> String {
format!("{} {}", response.status().as_u16(), response.status())
}
async fn log_sink_client_unavailable(
state: &AppState,
configured_client: Option<&str>,
request_client_header: Option<&str>,
sink_name: &str,
resolved_client_name: &str,
target_table_name: Option<&str>,
event_type: Option<&str>,
external_id: Option<&str>,
verified: bool,
response: &HttpResponse,
reason: &str,
) {
let registered_client: Option<RegisteredClient> =
state.pg_registry.registered_client(resolved_client_name);
let live_pool_present = state.pg_registry.get_pool(resolved_client_name).is_some();
let logging_pool_present = state
.logging_client_name
.as_deref()
.is_some_and(|client_name| state.pg_registry.get_pool(client_name).is_some());
let (catalog_has_database_connection, catalog_lookup_error): (Option<bool>, Option<String>) =
if state.gateway_database_backed_client_loading_enabled {
match catalog_client_has_database_connection(state, resolved_client_name).await {
Ok(value) => (Some(value), None),
Err(error) => (None, Some(error)),
}
} else {
(None, None)
};
error!(
target: "athena::webhook_sinks",
sink_name = %sink_name,
configured_client = configured_client.unwrap_or(""),
request_client_header = request_client_header.unwrap_or(""),
resolved_client_name = %resolved_client_name,
target_table_name = target_table_name.unwrap_or(""),
event_type = event_type.unwrap_or(""),
external_id = external_id.unwrap_or(""),
verified,
reason = %reason,
status = %response.status(),
response_detail = %response_error_message(response),
registry_has_live_pool = live_pool_present,
registry_has_registered_client = registered_client.is_some(),
registry_registered_client_name =
registered_client.as_ref().map(|client| client.client_name.as_str()).unwrap_or(""),
registry_registered_source =
registered_client.as_ref().map(|client| client.source.as_str()).unwrap_or(""),
registry_registered_active = ?registered_client.as_ref().map(|client| client.is_active),
registry_registered_frozen = ?registered_client.as_ref().map(|client| client.is_frozen),
registry_registered_pool_connected =
?registered_client.as_ref().map(|client| client.pool_connected),
gateway_database_backed_client_loading_enabled =
state.gateway_database_backed_client_loading_enabled,
logging_client_name = state.logging_client_name.as_deref().unwrap_or(""),
logging_pool_present,
catalog_has_database_connection = ?catalog_has_database_connection,
catalog_lookup_error = catalog_lookup_error.as_deref().unwrap_or(""),
"Inbound webhook sink client is unavailable"
);
}
fn find_sink_definition<'a>(
registry: &'a HashMap<String, InboundWebhookSinkDefinition>,
sink_name: &str,
) -> Option<(&'a str, &'a InboundWebhookSinkDefinition)> {
registry
.iter()
.find(|(name, _)| name.eq_ignore_ascii_case(sink_name.trim()))
.map(|(name, definition)| (name.as_str(), definition))
}
fn expected_shared_secret(
definition: &InboundWebhookSinkDefinition,
) -> Result<Option<String>, HttpResponse> {
if definition.disable_shared_secret_verification {
return Ok(None);
}
let Some(env_var_name) = definition
.shared_secret_env_var
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return Ok(None);
};
match env::var(env_var_name) {
Ok(value) if !value.trim().is_empty() => Ok(Some(value)),
_ => Err(internal_error(
"Inbound webhook sink secret is not configured",
format!("Environment variable '{env_var_name}' is not set or is blank"),
)),
}
}
fn enforce_sink_athena_base_url(
definition: &InboundWebhookSinkDefinition,
req: &HttpRequest,
sink_name: &str,
) -> Result<(), HttpResponse> {
let Some(expected_base_url) = definition
.athena_base_url
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return Ok(());
};
let Some(request_base_url) = computed_request_base_url(req) else {
warn!(
target: "athena::webhook_sinks",
sink_name = %sink_name,
expected_base_url = %expected_base_url,
host = header_value_trimmed(req, "host").unwrap_or_default(),
forwarded_host = header_value_trimmed(req, "x-forwarded-host").unwrap_or_default(),
forwarded_proto = header_value_trimmed(req, "x-forwarded-proto").unwrap_or_default(),
"Inbound webhook mirror URL rejected because the request base URL could not be resolved"
);
return Err(forbidden_with_code(
"Inbound webhook mirror URL rejected",
format!(
"Sink '{}' is pinned to Athena base URL '{}' but the request base URL could not be resolved",
sink_name, expected_base_url
),
"WEBHOOK_SINK_ATHENA_BASE_URL_MISMATCH",
));
};
if same_normalized_base_url(expected_base_url, &request_base_url) {
return Ok(());
}
warn!(
target: "athena::webhook_sinks",
sink_name = %sink_name,
expected_base_url = %expected_base_url,
request_base_url = %request_base_url,
host = header_value_trimmed(req, "host").unwrap_or_default(),
forwarded_host = header_value_trimmed(req, "x-forwarded-host").unwrap_or_default(),
forwarded_proto = header_value_trimmed(req, "x-forwarded-proto").unwrap_or_default(),
"Inbound webhook mirror URL rejected because the request reached the wrong Athena base URL"
);
Err(forbidden_with_code(
"Inbound webhook mirror URL rejected",
format!(
"Sink '{}' is pinned to Athena base URL '{}' but this request resolved to '{}'",
sink_name, expected_base_url, request_base_url
),
"WEBHOOK_SINK_ATHENA_BASE_URL_MISMATCH",
))
}
fn computed_request_base_url(req: &HttpRequest) -> Option<String> {
let scheme = header_value_trimmed(req, "x-forwarded-proto")
.or_else(|| forwarded_header_part(req, "proto"))
.unwrap_or_else(|| req.connection_info().scheme().trim().to_string());
let host = header_value_trimmed(req, "x-forwarded-host")
.or_else(|| forwarded_header_part(req, "host"))
.or_else(|| header_value_trimmed(req, "host"))
.or_else(|| {
let value = req.connection_info().host().trim().to_string();
if value.is_empty() { None } else { Some(value) }
})?;
let prefix = header_value_trimmed(req, "x-forwarded-prefix")
.unwrap_or_default()
.trim_end_matches('/')
.to_string();
let suffix = if prefix.is_empty() {
String::new()
} else if prefix.starts_with('/') {
prefix
} else {
format!("/{prefix}")
};
Some(format!("{scheme}://{host}{suffix}"))
}
fn same_normalized_base_url(expected: &str, actual: &str) -> bool {
match (normalize_base_url(expected), normalize_base_url(actual)) {
(Some(left), Some(right)) => left == right,
_ => false,
}
}
fn normalize_base_url(value: &str) -> Option<String> {
let uri = value
.trim()
.trim_end_matches('/')
.parse::<actix_web::http::Uri>()
.ok()?;
let scheme = uri.scheme_str()?.trim().to_ascii_lowercase();
let authority = uri.authority()?;
let host = authority
.host()
.trim()
.trim_end_matches('.')
.to_ascii_lowercase();
if host.is_empty() {
return None;
}
let port = authority
.port_u16()
.filter(|port| !is_default_port(&scheme, *port));
let path = uri.path().trim_end_matches('/');
let normalized_path = if path.is_empty() || path == "/" {
""
} else {
path
};
Some(match port {
Some(port) => format!("{scheme}://{host}:{port}{normalized_path}"),
None => format!("{scheme}://{host}{normalized_path}"),
})
}
fn is_default_port(scheme: &str, port: u16) -> bool {
matches!((scheme, port), ("https", 443) | ("http", 80))
}
fn header_value_trimmed(req: &HttpRequest, name: &str) -> Option<String> {
req.headers()
.get(name)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn forwarded_header_part(req: &HttpRequest, key_name: &str) -> Option<String> {
let forwarded = header_value_trimmed(req, "forwarded")?;
forwarded
.split(',')
.flat_map(|entry| entry.split(';'))
.filter_map(|part| part.trim().split_once('='))
.find_map(|(key, value)| {
if !key.trim().eq_ignore_ascii_case(key_name) {
return None;
}
let value = value.trim().trim_matches('"').trim();
if value.is_empty() {
None
} else {
Some(value.to_string())
}
})
}
fn sink_metadata(definition: &InboundWebhookSinkDefinition) -> Result<Value, HttpResponse> {
match definition.metadata.clone().unwrap_or_else(|| json!({})) {
Value::Object(object) => Ok(Value::Object(object)),
_ => Err(internal_error(
"Inbound webhook sink metadata config is invalid",
"Sink metadata must be a JSON object",
)),
}
}
fn map_to_json_object(source: &HashMap<String, String>) -> Map<String, Value> {
let mut object = Map::new();
let mut keys = source.keys().collect::<Vec<_>>();
keys.sort();
for key in keys {
if let Some(value) = source.get(key) {
object.insert(key.clone(), Value::String(value.clone()));
}
}
object
}
fn headers_to_json_object(headers: &actix_web::http::header::HeaderMap) -> Map<String, Value> {
let mut object = Map::new();
for (name, value) in headers {
if let Ok(text) = value.to_str() {
object.insert(
name.as_str().to_ascii_lowercase(),
Value::String(text.to_string()),
);
}
}
object
}
#[cfg(test)]
mod tests {
use super::{list_webhook_sink_templates, post_mollie_alias, post_webhook_alias};
use crate::AppState;
use actix_web::http::StatusCode;
use actix_web::http::header::{HOST, HeaderName, HeaderValue};
use actix_web::{App, body::to_bytes, test, web};
use athena_webhooks::InboundWebhookSinkDefinition;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
fn state_with_registry() -> web::Data<AppState> {
state_with_registry_definition(InboundWebhookSinkDefinition {
client: Some("suits-formations".to_string()),
schema_name: Some("athena".to_string()),
table_name: "webhook_sink_events".to_string(),
event_type_json_pointer: Some("/type".to_string()),
external_id_json_pointer: Some("/entityId".to_string()),
..Default::default()
})
}
fn state_with_registry_definition(
definition: InboundWebhookSinkDefinition,
) -> web::Data<AppState> {
let mut state = AppState::default();
let mut registry = HashMap::new();
registry.insert("mollie".to_string(), definition);
state.webhook_sink_registry = Some(Arc::new(registry));
web::Data::new(state)
}
#[actix_web::test]
async fn templates_route_lists_loaded_registry_entries() {
let app = test::init_service(
App::new()
.app_data(state_with_registry())
.service(list_webhook_sink_templates),
)
.await;
let request = test::TestRequest::get()
.uri("/webhook-sinks/templates")
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert_eq!(json["data"]["templates"][0]["name"], "mollie");
assert_eq!(
json["data"]["templates"][0]["qualified_table_name"],
"athena.webhook_sink_events"
);
}
#[actix_web::test]
async fn webhooks_alias_routes_into_named_sink() {
let app = test::init_service(
App::new()
.app_data(state_with_registry())
.service(post_webhook_alias),
)
.await;
let request = test::TestRequest::post()
.uri("/webhooks/mollie")
.set_payload(r#"{"type":"payment-link.paid","entityId":"pl_123"}"#)
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert!(
json["error"]
.as_str()
.unwrap_or_default()
.contains("suits-formations")
);
}
#[actix_web::test]
async fn mollie_shortcut_alias_uses_mollie_sink() {
let app = test::init_service(
App::new()
.app_data(state_with_registry())
.service(post_mollie_alias),
)
.await;
let request = test::TestRequest::post()
.uri("/mollie")
.set_payload(r#"{"type":"payout.initiated","entityId":"payout_123"}"#)
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert!(
json["error"]
.as_str()
.unwrap_or_default()
.contains("suits-formations")
);
}
#[actix_web::test]
async fn sink_can_disable_shared_secret_verification() {
let app = test::init_service(
App::new()
.app_data(state_with_registry_definition(
InboundWebhookSinkDefinition {
client: Some("suits-formations".to_string()),
schema_name: Some("athena".to_string()),
table_name: "webhook_sink_events".to_string(),
shared_secret_env_var: Some("MOLLIE_WEBHOOK_SINK_SECRET".to_string()),
disable_shared_secret_verification: true,
event_type_json_pointer: Some("/type".to_string()),
external_id_json_pointer: Some("/entityId".to_string()),
..Default::default()
},
))
.service(post_webhook_alias),
)
.await;
let request = test::TestRequest::post()
.uri("/webhooks/mollie")
.set_payload(r#"{"type":"hook.ping","entityId":"hook_123"}"#)
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert!(
json["error"]
.as_str()
.unwrap_or_default()
.contains("suits-formations")
);
}
#[actix_web::test]
async fn sink_can_acknowledge_without_persistence_when_configured() {
let app = test::init_service(
App::new()
.app_data(state_with_registry_definition(
InboundWebhookSinkDefinition {
client: Some("suits-formations".to_string()),
schema_name: Some("athena".to_string()),
table_name: "webhook_sink_events".to_string(),
disable_shared_secret_verification: true,
acknowledge_without_persistence: true,
event_type_json_pointer: Some("/type".to_string()),
external_id_json_pointer: Some("/entityId".to_string()),
..Default::default()
},
))
.service(post_webhook_alias),
)
.await;
let request = test::TestRequest::post()
.uri("/webhooks/mollie")
.set_payload(r#"{"type":"payout.processing-at-bank","entityId":"payout_123"}"#)
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::OK);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert_eq!(json["data"]["handled"], Value::Bool(false));
assert_eq!(json["data"]["controlEvent"], Value::Bool(false));
assert_eq!(json["data"]["degradedPersistence"], Value::Bool(true));
assert_eq!(
json["data"]["reason"],
Value::String("primary_client_unavailable".to_string())
);
}
#[actix_web::test]
async fn sink_rejects_requests_from_other_athena_mirrors() {
let app = test::init_service(
App::new()
.app_data(state_with_registry_definition(
InboundWebhookSinkDefinition {
client: Some("suits-formations".to_string()),
athena_base_url: Some("https://mirror2.athena-cluster.com".to_string()),
schema_name: Some("athena".to_string()),
table_name: "webhook_sink_events".to_string(),
disable_shared_secret_verification: true,
event_type_json_pointer: Some("/type".to_string()),
external_id_json_pointer: Some("/entityId".to_string()),
..Default::default()
},
))
.service(post_webhook_alias),
)
.await;
let request = test::TestRequest::post()
.uri("/webhooks/mollie")
.insert_header((HOST, HeaderValue::from_static("mirror3.athena-cluster.com")))
.insert_header((
HeaderName::from_static("x-forwarded-proto"),
HeaderValue::from_static("https"),
))
.set_payload(r#"{"type":"hook.ping","entityId":"hook_123"}"#)
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::FORBIDDEN);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert_eq!(json["code"], "WEBHOOK_SINK_ATHENA_BASE_URL_MISMATCH");
}
#[actix_web::test]
async fn sink_accepts_forwarded_matching_athena_base_url() {
let app = test::init_service(
App::new()
.app_data(state_with_registry_definition(
InboundWebhookSinkDefinition {
client: Some("suits-formations".to_string()),
athena_base_url: Some("https://mirror2.athena-cluster.com".to_string()),
schema_name: Some("athena".to_string()),
table_name: "webhook_sink_events".to_string(),
disable_shared_secret_verification: true,
event_type_json_pointer: Some("/type".to_string()),
external_id_json_pointer: Some("/entityId".to_string()),
..Default::default()
},
))
.service(post_webhook_alias),
)
.await;
let request = test::TestRequest::post()
.uri("/webhooks/mollie")
.insert_header((HOST, HeaderValue::from_static("internal-athena:5052")))
.insert_header((
HeaderName::from_static("x-forwarded-host"),
HeaderValue::from_static("mirror2.athena-cluster.com"),
))
.insert_header((
HeaderName::from_static("x-forwarded-proto"),
HeaderValue::from_static("https"),
))
.set_payload(r#"{"type":"hook.ping","entityId":"hook_123"}"#)
.to_request();
let response = test::call_service(&app, request).await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: Value = serde_json::from_slice(&body).expect("json response");
assert!(
json["error"]
.as_str()
.unwrap_or_default()
.contains("suits-formations")
);
}
}