use actix_web::{
HttpRequest, HttpResponse, Responder, delete, get, patch, post,
web::{self, Bytes, Data, Path, Query},
};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use sqlx::postgres::PgPool;
use std::collections::{BTreeSet, HashMap};
use uuid::Uuid;
use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::client_context::pool_for_client;
use crate::api::response::{api_success, bad_request, error_response_with_code};
use crate::data::billing::{
BillingProviderConnectionListFilters, BillingProviderConnectionRecord, BillingStoreError,
BillingWebhookEventListFilters, CreateBillingProviderConnectionInput,
UpdateBillingProviderConnectionInput, billing_capabilities_for_record,
billing_reconcile_response_payload, billing_webhook_response_payload,
create_billing_provider_connection, delete_billing_provider_connection,
get_billing_provider_connection, ingest_and_upsert_billing_webhook_for_connection,
list_billing_provider_connections, list_billing_webhook_events,
reconcile_billing_document_for_connection, update_billing_provider_connection,
};
use crate::data::billing_auth_sync::{
BillingAuthSyncConfig, BillingAuthSyncError, billing_auth_sync_failure_status,
billing_auth_sync_not_configured_status, dispatch_billing_auth_right_sync,
};
use crate::webhooks::{
GatewayWebhookTrigger, ROUTE_BILLING_DOCUMENT_UPSERTED, spawn_gateway_webhook_sink_dispatch,
};
use athena_billing::BillingConnectionMode;
use athena_billing::contracts::{
billing_athena_auth_contract,
billing_connection_request_examples as shared_billing_connection_request_examples,
billing_connection_seed_sql_paths, billing_document_kinds as shared_billing_document_kinds,
billing_grant_descriptor_views as shared_billing_grant_descriptor_views,
billing_grants_contract, billing_integration_bootstrap_contract,
billing_providers_integration_contract,
billing_reconcile_resource_kinds as shared_billing_reconcile_resource_kinds,
billing_sink_integration_contract,
};
use athena_billing::providers::{
BillingProviderRefetchRequest, BillingProviderResourceKind, billing_provider_catalog,
billing_signature_header_name, enrich_connection_config_for_provider,
validate_connection_config_for_provider,
};
use athena_billing::sinks::{
BillingSinkCreateParams, BillingSinkHelperDefinition, BillingSinkResolvedDefinition,
DEFAULT_BILLING_SINK_TARGET_SCHEMA,
billing_sink_resolved_definitions as shared_billing_sink_resolved_definitions,
is_valid_sql_identifier, render_billing_sink_seed_sql as shared_render_billing_sink_seed_sql,
render_billing_target_schema_sql as shared_render_billing_target_schema_sql,
};
use athena_billing::{billing_dialect_right_groups, billing_translated_rights};
use athena_webhooks::WEBHOOK_ERROR_CODE_WEBHOOK_SINK_PROVISION_FAILED;
use athena_webhooks::sinks::{
CreateGatewayWebhookSinkParams, GatewayWebhookSinkRecord, upsert_gateway_webhook_sink,
};
const BILLING_TABLES_SQL: &str =
include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/sql/billing.sql"));
const BILLING_WEBHOOK_SINKS_SEED_SQL: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/sql/billing_webhook_sinks_seed.sql"
));
const BILLING_TARGET_SCHEMA_BILLING_EXAMPLE_SQL: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/sql/billing_target_schema_billing_example.sql"
));
const BILLING_SEED_MOLLIE_EXAMPLE_SQL: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/sql/billing_seed_mollie_example.sql"
));
const BILLING_SEED_STRIPE_EXAMPLE_SQL: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/sql/billing_seed_stripe_example.sql"
));
#[derive(Debug, Deserialize)]
struct BillingWebhookPath {
provider: String,
client_name: String,
connection_id: Uuid,
}
#[derive(Debug, Deserialize)]
struct BillingConnectionPath {
client_name: String,
connection_id: Uuid,
}
#[derive(Debug, Deserialize)]
struct BillingConnectionsClientPath {
client_name: String,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct BillingConnectionsListQuery {
#[serde(default)]
owner_kind: Option<String>,
#[serde(default)]
owner_id: Option<String>,
#[serde(default)]
provider: Option<String>,
#[serde(default)]
mode: Option<String>,
#[serde(default)]
include_deleted: Option<bool>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct BillingSinkHelperQuery {
#[serde(alias = "client_name")]
client_name: String,
#[serde(default, alias = "target_client_name")]
target_client_name: Option<String>,
#[serde(default, alias = "target_schema")]
target_schema: Option<String>,
#[serde(default, alias = "athena_base_url")]
athena_base_url: Option<String>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct BillingWebhookEventsListQuery {
#[serde(default)]
connection_id: Option<Uuid>,
#[serde(default)]
provider: Option<String>,
#[serde(default)]
limit: Option<i64>,
#[serde(default)]
offset: Option<i64>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ProvisionBillingSinksRequest {
#[serde(default)]
target_client_name: Option<String>,
#[serde(default)]
target_schema: Option<String>,
#[serde(default)]
athena_base_url: Option<String>,
#[serde(default)]
enabled: Option<bool>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct CreateBillingConnectionRequest {
owner_kind: String,
owner_id: String,
provider: String,
#[serde(default)]
mode: Option<String>,
#[serde(default)]
status: Option<String>,
#[serde(default)]
credential_kind: Option<String>,
provider_account_id: String,
#[serde(default)]
provider_profile_id: Option<String>,
#[serde(default)]
scopes: Option<Value>,
#[serde(default)]
config: Option<Value>,
#[serde(default)]
metadata: Option<Value>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct UpdateBillingConnectionRequest {
#[serde(default)]
owner_kind: Option<String>,
#[serde(default)]
owner_id: Option<String>,
#[serde(default)]
mode: Option<String>,
#[serde(default)]
status: Option<String>,
#[serde(default)]
credential_kind: Option<String>,
#[serde(default)]
provider_account_id: Option<String>,
#[serde(default)]
provider_profile_id: Option<Option<String>>,
#[serde(default)]
scopes: Option<Value>,
#[serde(default)]
config: Option<Value>,
#[serde(default)]
metadata: Option<Value>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ReconcileBillingDocumentRequest {
resource_kind: BillingProviderResourceKind,
provider_ref: String,
#[serde(default)]
provider_customer_ref: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct BillingProviderConnectionView {
#[serde(flatten)]
connection: BillingProviderConnectionRecord,
capabilities: Value,
webhook_path: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct BillingClientNameResolution {
requested: String,
registry: String,
}
fn billing_auth_sync_error_response(
message: &str,
error: &BillingAuthSyncError,
) -> actix_web::HttpResponse {
match error {
BillingAuthSyncError::InvalidConfig(details) => error_response_with_code(
actix_web::http::StatusCode::INTERNAL_SERVER_ERROR,
message,
details.clone(),
"BILLING_AUTH_SYNC_CONFIG_INVALID",
None,
),
BillingAuthSyncError::Request(details) => error_response_with_code(
actix_web::http::StatusCode::BAD_GATEWAY,
message,
details.clone(),
"BILLING_AUTH_SYNC_FAILED",
None,
),
BillingAuthSyncError::Rejected { status, body } => error_response_with_code(
actix_web::http::StatusCode::BAD_GATEWAY,
message,
format!("Athena Auth responded with status {status}: {body}"),
"BILLING_AUTH_SYNC_FAILED",
Some(json!({
"auth_status": status,
"auth_response_body": body
})),
),
}
}
fn billing_route_success_data(
base_payload: Value,
extras: &[(&str, Value)],
) -> serde_json::Map<String, Value> {
let mut response_data = match base_payload.clone() {
Value::Object(map) => map,
_ => serde_json::Map::new(),
};
for (key, value) in extras {
response_data.insert((*key).to_string(), value.clone());
}
response_data.insert("artifacts".to_string(), base_payload);
response_data
}
fn billing_store_error_response(
message: &str,
error: BillingStoreError,
) -> actix_web::HttpResponse {
match error {
BillingStoreError::NotFound => error_response_with_code(
actix_web::http::StatusCode::NOT_FOUND,
message,
"Billing provider connection was not found.",
"BILLING_CONNECTION_NOT_FOUND",
None,
),
BillingStoreError::UnsupportedProvider(provider) => error_response_with_code(
actix_web::http::StatusCode::BAD_REQUEST,
message,
format!("Billing provider '{provider}' is not supported."),
"BILLING_PROVIDER_UNSUPPORTED",
Some(json!({ "provider": provider })),
),
BillingStoreError::InvalidConfig(details) => error_response_with_code(
actix_web::http::StatusCode::INTERNAL_SERVER_ERROR,
message,
details,
"BILLING_CONNECTION_CONFIG_INVALID",
None,
),
BillingStoreError::WebhookRejected { provider, details } => {
let code = if provider.eq_ignore_ascii_case("stripe") {
"STRIPE_WEBHOOK_REJECTED"
} else {
"MOLLIE_WEBHOOK_REJECTED"
};
error_response_with_code(
actix_web::http::StatusCode::UNAUTHORIZED,
message,
details,
code,
Some(json!({ "provider": provider })),
)
}
BillingStoreError::UnsupportedWebhook(details) => error_response_with_code(
actix_web::http::StatusCode::UNPROCESSABLE_ENTITY,
message,
details,
"BILLING_WEBHOOK_PAYLOAD_UNSUPPORTED",
None,
),
BillingStoreError::Parse(details) => error_response_with_code(
actix_web::http::StatusCode::BAD_GATEWAY,
message,
details,
"BILLING_PROVIDER_RESPONSE_INVALID",
None,
),
BillingStoreError::Http(details) => error_response_with_code(
actix_web::http::StatusCode::BAD_GATEWAY,
message,
details,
"BILLING_PROVIDER_HTTP_FAILED",
None,
),
BillingStoreError::Adapter(details) => error_response_with_code(
actix_web::http::StatusCode::BAD_GATEWAY,
message,
details,
"BILLING_ADAPTER_FAILED",
None,
),
BillingStoreError::Database(error) => crate::api::response::processed_error(
crate::error::sqlx_parser::process_sqlx_error(&error),
),
}
}
fn request_headers(req: &HttpRequest) -> Vec<(String, String)> {
req.headers()
.iter()
.filter_map(|(name, value)| {
value
.to_str()
.ok()
.map(|value| (name.as_str().to_string(), value.to_string()))
})
.collect()
}
fn signature_headers(req: &HttpRequest, provider: &str) -> Vec<String> {
let Some(target_header) = billing_signature_header_name(provider) else {
return Vec::new();
};
req.headers()
.iter()
.filter(|(name, _)| name.as_str().eq_ignore_ascii_case(target_header))
.filter_map(|(_, value)| value.to_str().ok().map(ToString::to_string))
.collect()
}
fn resolve_billing_client_name(
client_name: &str,
) -> Result<BillingClientNameResolution, actix_web::HttpResponse> {
let requested = client_name.trim().to_string();
if requested.is_empty() {
return Err(bad_request(
"Invalid client_name",
"client_name must not be empty.",
));
}
Ok(BillingClientNameResolution {
registry: requested.replace('_', "-"),
requested,
})
}
async fn resolve_billing_client_pool(
app_state: &AppState,
client_name: &str,
) -> Result<(BillingClientNameResolution, PgPool), actix_web::HttpResponse> {
let resolution = resolve_billing_client_name(client_name)?;
let pool = pool_for_client(app_state, &resolution.registry).await?;
Ok((resolution, pool))
}
fn connection_metadata_string(metadata: &Value, pointers: &[&str]) -> Option<String> {
pointers.iter().find_map(|pointer| {
metadata
.pointer(pointer)
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
})
}
fn connection_metadata_bool(metadata: &Value, pointers: &[&str]) -> bool {
pointers
.iter()
.find_map(|pointer| metadata.pointer(pointer).and_then(Value::as_bool))
.unwrap_or(false)
}
fn billing_webhook_athena_base_url(connection: &BillingProviderConnectionRecord) -> Option<String> {
connection_metadata_string(
&connection.metadata,
&[
"/webhookIngress/athenaBaseUrl",
"/webhookIngress/athena_base_url",
],
)
}
fn billing_webhook_allows_unsigned_hook_ping(connection: &BillingProviderConnectionRecord) -> bool {
connection_metadata_bool(
&connection.metadata,
&[
"/webhookIngress/allowUnsignedHookPing",
"/webhookIngress/allow_unsigned_hook_ping",
],
)
}
fn normalize_base_url(value: &str) -> Option<String> {
let uri = value
.trim()
.trim_end_matches('/')
.parse::<actix_web::http::Uri>()
.ok()?;
let scheme = uri.scheme_str()?.trim().to_ascii_lowercase();
let authority = uri.authority()?;
let host = authority
.host()
.trim()
.trim_end_matches('.')
.to_ascii_lowercase();
if host.is_empty() {
return None;
}
let port = authority
.port_u16()
.filter(|port| !matches!((scheme.as_str(), *port), ("https", 443) | ("http", 80)));
let path = uri.path().trim_end_matches('/');
let normalized_path = if path.is_empty() || path == "/" {
""
} else {
path
};
Some(match port {
Some(port) => format!("{scheme}://{host}:{port}{normalized_path}"),
None => format!("{scheme}://{host}{normalized_path}"),
})
}
fn computed_request_base_url(req: &HttpRequest) -> Option<String> {
let scheme = header_value_trimmed(req, "x-forwarded-proto")
.or_else(|| forwarded_header_part(req, "proto"))
.unwrap_or_else(|| req.connection_info().scheme().trim().to_string());
let host = header_value_trimmed(req, "x-forwarded-host")
.or_else(|| forwarded_header_part(req, "host"))
.or_else(|| header_value_trimmed(req, "host"))
.or_else(|| {
let value = req.connection_info().host().trim().to_string();
if value.is_empty() { None } else { Some(value) }
})?;
let prefix = header_value_trimmed(req, "x-forwarded-prefix")
.unwrap_or_default()
.trim_end_matches('/')
.to_string();
let suffix = if prefix.is_empty() {
String::new()
} else if prefix.starts_with('/') {
prefix
} else {
format!("/{prefix}")
};
Some(format!("{scheme}://{host}{suffix}"))
}
fn header_value_trimmed(req: &HttpRequest, name: &str) -> Option<String> {
req.headers()
.get(name)
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn forwarded_header_part(req: &HttpRequest, key_name: &str) -> Option<String> {
let forwarded = header_value_trimmed(req, "forwarded")?;
forwarded
.split(',')
.flat_map(|entry| entry.split(';'))
.filter_map(|part| part.trim().split_once('='))
.find_map(|(key, value)| {
if !key.trim().eq_ignore_ascii_case(key_name) {
return None;
}
let value = value.trim().trim_matches('"').trim();
if value.is_empty() {
None
} else {
Some(value.to_string())
}
})
}
fn same_normalized_base_url(expected: &str, actual: &str) -> bool {
match (normalize_base_url(expected), normalize_base_url(actual)) {
(Some(left), Some(right)) => left == right,
_ => false,
}
}
fn enforce_billing_webhook_athena_base_url(
req: &HttpRequest,
connection: &BillingProviderConnectionRecord,
requested_client_name: &str,
) -> Result<(), actix_web::HttpResponse> {
let Some(expected_base_url) = billing_webhook_athena_base_url(connection) else {
return Ok(());
};
let Some(request_base_url) = computed_request_base_url(req) else {
return Err(error_response_with_code(
actix_web::http::StatusCode::FORBIDDEN,
"Billing webhook mirror URL rejected",
format!(
"Connection '{}' is pinned to Athena base URL '{}' but the request base URL could not be resolved.",
connection.id, expected_base_url
),
"BILLING_WEBHOOK_BASE_URL_MISMATCH",
Some(json!({
"connection_id": connection.id,
"client_name": requested_client_name,
"expected_athena_base_url": expected_base_url
})),
));
};
if same_normalized_base_url(&expected_base_url, &request_base_url) {
return Ok(());
}
Err(error_response_with_code(
actix_web::http::StatusCode::FORBIDDEN,
"Billing webhook mirror URL rejected",
format!(
"Connection '{}' is pinned to Athena base URL '{}' but this request resolved to '{}'.",
connection.id, expected_base_url, request_base_url
),
"BILLING_WEBHOOK_BASE_URL_MISMATCH",
Some(json!({
"connection_id": connection.id,
"client_name": requested_client_name,
"expected_athena_base_url": expected_base_url,
"request_base_url": request_base_url
})),
))
}
fn maybe_unsigned_mollie_hook_ping_response(
provider: &str,
requested_client_name: &str,
connection: &BillingProviderConnectionRecord,
body: &[u8],
signature_headers: &[String],
) -> Option<Value> {
if !provider.eq_ignore_ascii_case("mollie")
|| !signature_headers.is_empty()
|| !billing_webhook_allows_unsigned_hook_ping(connection)
{
return None;
}
let payload: Value = serde_json::from_slice(body).ok()?;
if payload.get("type").and_then(Value::as_str) != Some("hook.ping") {
return None;
}
Some(json!({
"provider": provider,
"client_name": requested_client_name,
"connection_id": connection.id,
"handled": false,
"eventType": "hook.ping",
"controlEvent": true,
"verificationMode": "unsigned_hook_ping_override"
}))
}
fn billing_sink_definitions(
client_name: &str,
target_client_name: &str,
target_schema: &str,
athena_base_url: Option<&str>,
) -> Vec<BillingSinkResolvedDefinition> {
shared_billing_sink_resolved_definitions(
client_name,
target_client_name,
ROUTE_BILLING_DOCUMENT_UPSERTED,
target_schema,
athena_base_url,
true,
)
}
fn billing_sink_helpers(
client_name: &str,
target_client_name: &str,
target_schema: &str,
athena_base_url: Option<&str>,
) -> Vec<BillingSinkHelperDefinition> {
billing_sink_definitions(
client_name,
target_client_name,
target_schema,
athena_base_url,
)
.into_iter()
.map(|definition| definition.helper)
.collect()
}
fn billing_sink_create_params(
client_name: &str,
target_client_name: &str,
target_schema: &str,
athena_base_url: Option<&str>,
enabled: bool,
) -> Vec<CreateGatewayWebhookSinkParams> {
shared_billing_sink_resolved_definitions(
client_name,
target_client_name,
ROUTE_BILLING_DOCUMENT_UPSERTED,
target_schema,
athena_base_url,
enabled,
)
.into_iter()
.map(|definition: BillingSinkResolvedDefinition| {
let params: BillingSinkCreateParams = definition.create_params;
CreateGatewayWebhookSinkParams {
name: params.name,
description: params.description,
enabled: params.enabled,
athena_base_url: params.athena_base_url,
client_name: params.client_name,
table_name: Some(params.table_name),
route_key: params.route_key,
target_client_name: params.target_client_name,
target_table: params.target_table,
target_write_mode: Some(params.target_write_mode),
target_conflict_columns: Some(params.target_conflict_columns),
projection_template: params.projection_template,
include_request_body_in_context: params.include_request_body_in_context,
lookup_keys: params.lookup_keys,
slug: Some(params.slug),
state_key_template: params.state_key_template,
state_cap_max_fires: params.state_cap_max_fires,
state_cap_window_seconds: params.state_cap_window_seconds,
idempotency_template: params.idempotency_template,
state_resolution_mode: params.state_resolution_mode,
state_resolution_query_builder: params.state_resolution_query_builder,
}
})
.collect()
}
fn resolved_billing_sink_target_schema(value: Option<&str>) -> Result<String, HttpResponse> {
let normalized = value
.map(str::trim)
.filter(|schema| !schema.is_empty())
.unwrap_or(DEFAULT_BILLING_SINK_TARGET_SCHEMA);
if !is_valid_sql_identifier(normalized) {
return Err(bad_request(
"Invalid targetSchema",
"targetSchema must be a valid SQL identifier.",
));
}
Ok(normalized.to_string())
}
fn billing_native_right_views() -> Vec<athena_rights::AthenaRightDescriptor> {
athena_rights::billing_right_catalog()
}
fn billing_resolved_native_rights_for_keys(
keys: impl IntoIterator<Item = String>,
) -> Vec<athena_rights::AthenaRightDescriptor> {
athena_rights::resolve_right_selection(
BTreeSet::<String>::from_iter(keys)
.iter()
.map(String::as_str),
&billing_native_right_views(),
)
.native_rights
}
fn billing_resolved_native_right_modules_for_keys(
keys: impl IntoIterator<Item = String>,
) -> Vec<athena_rights::AthenaRightModule> {
athena_rights::resolve_right_module_selection(
BTreeSet::<String>::from_iter(keys)
.iter()
.map(String::as_str),
&billing_native_right_views(),
)
.modules
}
fn billing_resolved_right_keys_for_keys(keys: impl IntoIterator<Item = String>) -> Vec<String> {
athena_rights::resolve_right_selection(
BTreeSet::<String>::from_iter(keys)
.iter()
.map(String::as_str),
&billing_native_right_views(),
)
.resolved_keys
}
fn billing_unknown_right_keys_for_keys(keys: impl IntoIterator<Item = String>) -> Vec<String> {
athena_rights::resolve_right_selection(
BTreeSet::<String>::from_iter(keys)
.iter()
.map(String::as_str),
&billing_native_right_views(),
)
.unknown_keys
}
fn billing_right_resolution_payload(keys: &[String]) -> Value {
json!({
"grantKeys": keys,
"rightKeys": billing_resolved_right_keys_for_keys(keys.iter().cloned()),
"nativeRights": billing_resolved_native_rights_for_keys(keys.iter().cloned()),
"nativeRightModules": billing_resolved_native_right_modules_for_keys(keys.iter().cloned()),
"unknownRightKeys": billing_unknown_right_keys_for_keys(keys.iter().cloned()),
})
}
fn billing_grants_payload() -> Value {
let grants_contract = billing_grants_contract(ROUTE_BILLING_DOCUMENT_UPSERTED);
let native_rights = billing_native_right_views();
let dialect_rights = billing_dialect_right_groups();
let translated_rights = billing_translated_rights();
let counts = json!({
"grants": grants_contract.counts.grants,
"nativeRights": native_rights.len(),
"translatedRights": translated_rights.len(),
"sourceKinds": grants_contract.counts.source_kinds,
"reconcileResourceKinds": grants_contract.counts.reconcile_resource_kinds,
});
let mut payload =
serde_json::to_value(grants_contract).expect("billing grants contract should serialize");
let Value::Object(payload_map) = &mut payload else {
unreachable!("billing grants contract should serialize to a JSON object");
};
let connections_contract = payload_map
.get("connections")
.cloned()
.expect("billing grants contract should expose connections");
let webhook_sinks_contract = payload_map
.get("webhookSinks")
.cloned()
.expect("billing grants contract should expose webhook sinks");
payload_map.insert("nativeRights".to_string(), json!(native_rights));
payload_map.insert("dialectRights".to_string(), json!(dialect_rights));
payload_map.insert("translatedRights".to_string(), json!(translated_rights));
payload_map.insert(
"nativeRightModules".to_string(),
json!(billing_resolved_native_right_modules_for_keys(
shared_billing_grant_descriptor_views()
.iter()
.map(|grant| grant.key.to_string())
.collect::<Vec<_>>()
)),
);
payload_map.insert("counts".to_string(), counts);
payload_map.insert("tableSql".to_string(), json!(billing_tables_sql()));
payload_map.insert(
"route_key".to_string(),
json!(ROUTE_BILLING_DOCUMENT_UPSERTED),
);
payload_map.insert(
"routeKey".to_string(),
json!(ROUTE_BILLING_DOCUMENT_UPSERTED),
);
payload_map.insert(
"connections".to_string(),
json!({
"list_endpoint": connections_contract["list_endpoint"],
"create_endpoint": connections_contract["create_endpoint"],
"get_endpoint": connections_contract["get_endpoint"],
"update_endpoint": connections_contract["update_endpoint"],
"delete_endpoint": connections_contract["delete_endpoint"],
"reconcile_endpoint": connections_contract["reconcile_endpoint"],
"webhook_events_endpoint": connections_contract["webhook_events_endpoint"],
"seed_sql": billing_connection_seed_sql_paths(),
"seed_sql_examples": billing_connection_seed_sql_examples(),
"request_examples": billing_connection_request_examples(),
"table_sql": "sql/billing.sql"
}),
);
payload_map.insert(
"webhook_sinks".to_string(),
json!({
"helper_endpoint": webhook_sinks_contract["helper_endpoint"],
"provision_endpoint": webhook_sinks_contract["provision_endpoint"],
"seed_sql": "sql/billing_webhook_sinks_seed.sql",
"table_sql": "sql/billing.sql",
"target_table_sql_example": "sql/billing_target_schema_billing_example.sql"
}),
);
payload_map.insert(
"athena_auth".to_string(),
json!(billing_athena_auth_contract()),
);
payload_map.insert(
"bootstrap".to_string(),
json!(billing_integration_bootstrap_contract(
ROUTE_BILLING_DOCUMENT_UPSERTED
)),
);
payload
}
fn billing_tables_sql() -> &'static str {
BILLING_TABLES_SQL
}
fn billing_webhook_sinks_seed_sql() -> &'static str {
BILLING_WEBHOOK_SINKS_SEED_SQL
}
fn billing_target_schema_billing_example_sql() -> &'static str {
BILLING_TARGET_SCHEMA_BILLING_EXAMPLE_SQL
}
fn billing_seed_mollie_example_sql() -> &'static str {
BILLING_SEED_MOLLIE_EXAMPLE_SQL
}
fn billing_seed_stripe_example_sql() -> &'static str {
BILLING_SEED_STRIPE_EXAMPLE_SQL
}
fn billing_connection_seed_sql_examples() -> Value {
json!({
"mollie": billing_seed_mollie_example_sql(),
"stripe": billing_seed_stripe_example_sql()
})
}
fn billing_connection_request_examples() -> Value {
shared_billing_connection_request_examples()
}
fn normalize_required_text_field(
name: &str,
value: &str,
) -> Result<String, actix_web::HttpResponse> {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(bad_request(
format!("Invalid {name}"),
format!("{name} must not be empty."),
));
}
Ok(trimmed.to_string())
}
fn normalize_optional_text_field(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn normalize_json_object_field(
name: &str,
value: Option<Value>,
) -> Result<Value, actix_web::HttpResponse> {
match value {
Some(Value::Object(map)) => Ok(Value::Object(map)),
Some(other) => Err(bad_request(
format!("Invalid {name}"),
format!("{name} must be a JSON object, got {other}."),
)),
None => Ok(json!({})),
}
}
fn normalize_json_array_field(
name: &str,
value: Option<Value>,
) -> Result<Value, actix_web::HttpResponse> {
match value {
Some(Value::Array(items)) => Ok(Value::Array(items)),
Some(other) => Err(bad_request(
format!("Invalid {name}"),
format!("{name} must be a JSON array, got {other}."),
)),
None => Ok(json!([])),
}
}
fn enrich_billing_connection_config(
provider: &str,
mode: &str,
credential_kind: &str,
provider_profile_id: Option<&str>,
config: Value,
) -> Result<Value, actix_web::HttpResponse> {
let Some(mode) = BillingConnectionMode::parse(mode) else {
return Err(bad_request(
"Invalid mode",
format!("Unsupported billing connection mode '{mode}'."),
));
};
enrich_connection_config_for_provider(
provider,
mode,
credential_kind,
provider_profile_id,
config,
)
.map_err(|error| bad_request("Invalid config", error.to_string()))
}
fn billing_connection_webhook_path(
client_name: &str,
connection: &BillingProviderConnectionRecord,
) -> String {
format!(
"/billing/providers/{}/clients/{}/connections/{}/webhook",
connection.provider, client_name, connection.id
)
}
fn default_billing_credential_kind(provider: &str) -> &'static str {
match provider.trim().to_ascii_lowercase().as_str() {
"stripe" => "secret_key",
_ => "api_key",
}
}
fn resolve_updated_billing_connection_config(
current: &BillingProviderConnectionRecord,
mode: Option<&str>,
credential_kind: Option<&str>,
provider_profile_id: Option<Option<&str>>,
config_override: Option<Value>,
) -> Result<Option<Value>, actix_web::HttpResponse> {
let should_rebuild = mode.is_some()
|| credential_kind.is_some()
|| provider_profile_id.is_some()
|| config_override.is_some();
if !should_rebuild {
return Ok(None);
}
let effective_mode = mode.unwrap_or(current.mode.as_str());
let effective_credential_kind = credential_kind.unwrap_or(current.credential_kind.as_str());
let effective_profile_id = provider_profile_id.unwrap_or(
current
.provider_profile_id
.as_deref()
.map(Some)
.unwrap_or(None),
);
let base_config = config_override.unwrap_or_else(|| current.config.clone());
let config = enrich_billing_connection_config(
current.provider.as_str(),
effective_mode,
effective_credential_kind,
effective_profile_id,
base_config,
)?;
validate_connection_config_for_provider(current.provider.as_str(), &config)
.map_err(|error| bad_request("Invalid config", error.to_string()))?;
Ok(Some(config))
}
fn billing_connection_view(
client_name: &str,
connection: BillingProviderConnectionRecord,
) -> Result<BillingProviderConnectionView, actix_web::HttpResponse> {
let capabilities = billing_capabilities_for_record(&connection).map_err(|error| {
billing_store_error_response("Failed to derive billing connection capabilities", error)
})?;
Ok(BillingProviderConnectionView {
webhook_path: billing_connection_webhook_path(client_name, &connection),
capabilities: serde_json::to_value(capabilities).unwrap_or_else(|_| json!({})),
connection,
})
}
#[get("/admin/billing/clients/{client_name}/connections")]
async fn admin_list_billing_connections(
req: HttpRequest,
app_state: Data<AppState>,
path: Path<BillingConnectionsClientPath>,
query: Query<BillingConnectionsListQuery>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let client_name = path.client_name.trim().to_string();
if client_name.is_empty() {
return bad_request("Invalid client_name", "client_name must not be empty.");
}
let pool = match pool_for_client(app_state.get_ref(), &client_name).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let filters = BillingProviderConnectionListFilters {
owner_kind: normalize_optional_text_field(query.owner_kind.as_deref()),
owner_id: normalize_optional_text_field(query.owner_id.as_deref()),
provider: normalize_optional_text_field(query.provider.as_deref()),
mode: normalize_optional_text_field(query.mode.as_deref()),
include_deleted: query.include_deleted.unwrap_or(false),
};
let connections = match list_billing_provider_connections(&pool, &filters).await {
Ok(connections) => connections,
Err(error) => {
return billing_store_error_response("Failed to list billing connections", error);
}
};
let connections = match connections
.into_iter()
.map(|connection| billing_connection_view(&client_name, connection))
.collect::<Result<Vec<_>, _>>()
{
Ok(connections) => connections,
Err(resp) => return resp,
};
api_success(
"Listed billing connections",
json!({
"clientName": client_name,
"connections": connections
}),
)
}
#[post("/admin/billing/clients/{client_name}/connections")]
async fn admin_create_billing_connection(
req: HttpRequest,
app_state: Data<AppState>,
path: Path<BillingConnectionsClientPath>,
body: web::Json<CreateBillingConnectionRequest>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let client_name = path.client_name.trim().to_string();
if client_name.is_empty() {
return bad_request("Invalid client_name", "client_name must not be empty.");
}
let pool = match pool_for_client(app_state.get_ref(), &client_name).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let provider = match normalize_required_text_field("provider", &body.provider) {
Ok(provider) => provider.to_ascii_lowercase(),
Err(resp) => return resp,
};
let mode =
normalize_optional_text_field(body.mode.as_deref()).unwrap_or_else(|| "live".to_string());
let status = normalize_optional_text_field(body.status.as_deref())
.unwrap_or_else(|| "pending".to_string());
let credential_kind = normalize_optional_text_field(body.credential_kind.as_deref())
.unwrap_or_else(|| default_billing_credential_kind(&provider).to_string());
let provider_profile_id = normalize_optional_text_field(body.provider_profile_id.as_deref());
let scopes = match normalize_json_array_field("scopes", body.scopes.clone()) {
Ok(scopes) => scopes,
Err(resp) => return resp,
};
let raw_config = match normalize_json_object_field("config", body.config.clone()) {
Ok(config) => config,
Err(resp) => return resp,
};
let config = match enrich_billing_connection_config(
provider.as_str(),
mode.as_str(),
credential_kind.as_str(),
provider_profile_id.as_deref(),
raw_config,
) {
Ok(config) => config,
Err(resp) => return resp,
};
let metadata = match normalize_json_object_field("metadata", body.metadata.clone()) {
Ok(metadata) => metadata,
Err(resp) => return resp,
};
let input = CreateBillingProviderConnectionInput {
owner_kind: match normalize_required_text_field("ownerKind", &body.owner_kind) {
Ok(value) => value.to_ascii_lowercase(),
Err(resp) => return resp,
},
owner_id: match normalize_required_text_field("ownerId", &body.owner_id) {
Ok(value) => value,
Err(resp) => return resp,
},
provider,
mode,
status,
credential_kind,
provider_account_id: match normalize_required_text_field(
"providerAccountId",
&body.provider_account_id,
) {
Ok(value) => value,
Err(resp) => return resp,
},
provider_profile_id,
scopes,
config,
metadata,
};
let connection = match create_billing_provider_connection(&pool, &input).await {
Ok(connection) => connection,
Err(error) => {
return billing_store_error_response("Failed to create billing connection", error);
}
};
let connection = match billing_connection_view(&client_name, connection) {
Ok(connection) => connection,
Err(resp) => return resp,
};
api_success(
"Created billing connection",
json!({
"clientName": client_name,
"connection": connection
}),
)
}
#[get("/admin/billing/clients/{client_name}/connections/{connection_id}")]
async fn admin_get_billing_connection(
req: HttpRequest,
app_state: Data<AppState>,
path: Path<BillingConnectionPath>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let path = path.into_inner();
let client_name = path.client_name.trim().to_string();
if client_name.is_empty() {
return bad_request("Invalid client_name", "client_name must not be empty.");
}
let pool = match pool_for_client(app_state.get_ref(), &client_name).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let connection = match get_billing_provider_connection(&pool, path.connection_id).await {
Ok(connection) => connection,
Err(error) => {
return billing_store_error_response("Failed to load billing connection", error);
}
};
let connection = match billing_connection_view(&client_name, connection) {
Ok(connection) => connection,
Err(resp) => return resp,
};
api_success(
"Loaded billing connection",
json!({
"clientName": client_name,
"connection": connection
}),
)
}
#[patch("/admin/billing/clients/{client_name}/connections/{connection_id}")]
async fn admin_update_billing_connection(
req: HttpRequest,
app_state: Data<AppState>,
path: Path<BillingConnectionPath>,
body: web::Json<UpdateBillingConnectionRequest>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let path = path.into_inner();
let client_name = path.client_name.trim().to_string();
if client_name.is_empty() {
return bad_request("Invalid client_name", "client_name must not be empty.");
}
let pool = match pool_for_client(app_state.get_ref(), &client_name).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let current = match get_billing_provider_connection(&pool, path.connection_id).await {
Ok(connection) => connection,
Err(error) => {
return billing_store_error_response("Failed to load billing connection", error);
}
};
let mode = body
.mode
.as_deref()
.and_then(|value| normalize_optional_text_field(Some(value)));
let credential_kind = body
.credential_kind
.as_deref()
.and_then(|value| normalize_optional_text_field(Some(value)));
let provider_profile_id = body
.provider_profile_id
.clone()
.map(|value| value.and_then(|inner| normalize_optional_text_field(Some(inner.as_str()))));
let scopes = match body.scopes.clone() {
Some(value) => match normalize_json_array_field("scopes", Some(value)) {
Ok(scopes) => Some(scopes),
Err(resp) => return resp,
},
None => None,
};
let config_override = match body.config.clone() {
Some(config) => match normalize_json_object_field("config", Some(config)) {
Ok(config) => Some(config),
Err(resp) => return resp,
},
None => None,
};
let config = match resolve_updated_billing_connection_config(
¤t,
mode.as_deref(),
credential_kind.as_deref(),
provider_profile_id.as_ref().map(|value| value.as_deref()),
config_override,
) {
Ok(config) => config,
Err(resp) => return resp,
};
let metadata = match body.metadata.clone() {
Some(value) => match normalize_json_object_field("metadata", Some(value)) {
Ok(metadata) => Some(metadata),
Err(resp) => return resp,
},
None => None,
};
let input = UpdateBillingProviderConnectionInput {
owner_kind: body
.owner_kind
.as_deref()
.and_then(|value| normalize_optional_text_field(Some(value)))
.map(|value| value.to_ascii_lowercase()),
owner_id: body
.owner_id
.as_deref()
.and_then(|value| normalize_optional_text_field(Some(value))),
mode,
status: body
.status
.as_deref()
.and_then(|value| normalize_optional_text_field(Some(value))),
credential_kind,
provider_account_id: body
.provider_account_id
.as_deref()
.and_then(|value| normalize_optional_text_field(Some(value))),
provider_profile_id,
scopes,
config,
metadata,
};
let connection =
match update_billing_provider_connection(&pool, path.connection_id, &input).await {
Ok(connection) => connection,
Err(error) => {
return billing_store_error_response("Failed to update billing connection", error);
}
};
let connection = match billing_connection_view(&client_name, connection) {
Ok(connection) => connection,
Err(resp) => return resp,
};
api_success(
"Updated billing connection",
json!({
"clientName": client_name,
"connection": connection
}),
)
}
#[delete("/admin/billing/clients/{client_name}/connections/{connection_id}")]
async fn admin_delete_billing_connection(
req: HttpRequest,
app_state: Data<AppState>,
path: Path<BillingConnectionPath>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let path = path.into_inner();
let client_name = path.client_name.trim().to_string();
if client_name.is_empty() {
return bad_request("Invalid client_name", "client_name must not be empty.");
}
let pool = match pool_for_client(app_state.get_ref(), &client_name).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let connection = match delete_billing_provider_connection(&pool, path.connection_id).await {
Ok(connection) => connection,
Err(error) => {
return billing_store_error_response("Failed to delete billing connection", error);
}
};
let connection = match billing_connection_view(&client_name, connection) {
Ok(connection) => connection,
Err(resp) => return resp,
};
api_success(
"Deleted billing connection",
json!({
"clientName": client_name,
"connection": connection
}),
)
}
#[post("/admin/billing/clients/{client_name}/connections/{connection_id}/reconcile")]
async fn admin_reconcile_billing_document(
req: HttpRequest,
app_state: Data<AppState>,
path: Path<BillingConnectionPath>,
body: web::Json<ReconcileBillingDocumentRequest>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let path = path.into_inner();
let client_name = path.client_name.trim().to_string();
if client_name.is_empty() {
return bad_request("Invalid client_name", "client_name must not be empty.");
}
let provider_ref = match normalize_required_text_field("providerRef", &body.provider_ref) {
Ok(value) => value,
Err(resp) => return resp,
};
let pool = match pool_for_client(app_state.get_ref(), &client_name).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let reconcile_request = BillingProviderRefetchRequest {
resource_kind: body.resource_kind,
provider_ref,
provider_customer_ref: normalize_optional_text_field(body.provider_customer_ref.as_deref()),
};
let reconcile = match reconcile_billing_document_for_connection(
&pool,
path.connection_id,
&reconcile_request,
)
.await
{
Ok(result) => result,
Err(error) => {
return billing_store_error_response("Failed to reconcile billing document", error);
}
};
let auth_sync_config = match BillingAuthSyncConfig::from_env() {
Ok(config) => config,
Err(error) => {
return billing_auth_sync_error_response(
"Billing reconcile auth sync configuration is invalid",
&error,
);
}
};
let auth_sync_result = match auth_sync_config.as_ref() {
Some(config) => {
match dispatch_billing_auth_right_sync(
&app_state.client,
config,
&reconcile.auth_right_sync,
)
.await
{
Ok(result) => result,
Err(error) => {
if config.fail_mode
== crate::data::billing_auth_sync::BillingAuthSyncFailMode::Required
{
return billing_auth_sync_error_response(
"Billing reconcile auth sync failed",
&error,
);
}
billing_auth_sync_failure_status(config, &error)
}
}
}
None => billing_auth_sync_not_configured_status(),
};
let reconcile_payload = billing_reconcile_response_payload(&reconcile, &auth_sync_result);
let trigger = GatewayWebhookTrigger {
client_name: client_name.clone(),
route_key: ROUTE_BILLING_DOCUMENT_UPSERTED.to_string(),
table_name: Some(reconcile.upsert.table.clone()),
request_id: req
.headers()
.get("x-request-id")
.and_then(|value| value.to_str().ok())
.map(ToString::to_string),
request_method: req.method().as_str().to_string(),
request_path: req.path().to_string(),
headers: request_headers(&req),
payload: Some(json!({
"operation": "reconcile",
"connectionId": path.connection_id,
"resourceKind": reconcile.refetch.resource_kind,
"providerRef": reconcile.refetch.provider_ref,
})),
response: Some(reconcile_payload.clone()),
};
spawn_gateway_webhook_sink_dispatch(app_state.clone(), trigger);
let response_data = billing_route_success_data(
reconcile_payload,
&[
("client_name", json!(client_name)),
("connection_id", json!(path.connection_id)),
("route_key", json!(ROUTE_BILLING_DOCUMENT_UPSERTED)),
],
);
api_success("Reconciled billing document", Value::Object(response_data))
}
#[post("/billing/providers/{provider}/clients/{client_name}/connections/{connection_id}/webhook")]
async fn provider_webhook(
req: HttpRequest,
app_state: Data<AppState>,
path: Path<BillingWebhookPath>,
body: Bytes,
) -> impl Responder {
let path = path.into_inner();
let provider = path.provider.trim().to_ascii_lowercase();
if provider.is_empty() {
return bad_request(
"Invalid billing webhook path",
"provider and client_name must not be empty.",
);
}
let (client_name, pool) =
match resolve_billing_client_pool(app_state.get_ref(), &path.client_name).await {
Ok(result) => result,
Err(resp) => return resp,
};
let connection = match get_billing_provider_connection(&pool, path.connection_id).await {
Ok(connection) => connection,
Err(error) => {
return billing_store_error_response("Failed to load billing connection", error);
}
};
if connection.provider != provider {
return error_response_with_code(
actix_web::http::StatusCode::BAD_REQUEST,
"Billing provider mismatch",
format!(
"Connection '{}' is configured for provider '{}', not '{}'.",
path.connection_id, connection.provider, provider
),
"BILLING_PROVIDER_MISMATCH",
Some(json!({
"connection_id": path.connection_id,
"stored_provider": connection.provider,
"requested_provider": provider
})),
);
}
if let Err(response) =
enforce_billing_webhook_athena_base_url(&req, &connection, &client_name.requested)
{
return response;
}
let signature_headers = signature_headers(&req, &provider);
if let Some(payload) = maybe_unsigned_mollie_hook_ping_response(
&provider,
&client_name.requested,
&connection,
body.as_ref(),
&signature_headers,
) {
return api_success("Billing webhook ignored", payload);
}
let persistence = match ingest_and_upsert_billing_webhook_for_connection(
&pool,
path.connection_id,
body.as_ref(),
&signature_headers,
)
.await
{
Ok(result) => result,
Err(error) => {
return billing_store_error_response("Billing webhook ingestion failed", error);
}
};
let Some(persistence) = persistence else {
return api_success(
"Billing webhook ignored",
json!({
"provider": provider,
"client_name": client_name.requested,
"connection_id": path.connection_id,
"handled": false
}),
);
};
let auth_sync_config = match BillingAuthSyncConfig::from_env() {
Ok(config) => config,
Err(error) => {
return billing_auth_sync_error_response(
"Billing webhook auth sync configuration is invalid",
&error,
);
}
};
let auth_sync_result = match auth_sync_config.as_ref() {
Some(config) => match dispatch_billing_auth_right_sync(
&app_state.client,
config,
&persistence.auth_right_sync,
)
.await
{
Ok(result) => result,
Err(error) => {
if config.fail_mode
== crate::data::billing_auth_sync::BillingAuthSyncFailMode::Required
{
return billing_auth_sync_error_response(
"Billing webhook auth sync failed",
&error,
);
}
billing_auth_sync_failure_status(config, &error)
}
},
None => billing_auth_sync_not_configured_status(),
};
let persistence_payload = billing_webhook_response_payload(&persistence, &auth_sync_result);
let trigger = GatewayWebhookTrigger {
client_name: client_name.registry.clone(),
route_key: ROUTE_BILLING_DOCUMENT_UPSERTED.to_string(),
table_name: Some(persistence.upsert.table.clone()),
request_id: req
.headers()
.get("x-request-id")
.and_then(|value| value.to_str().ok())
.map(ToString::to_string),
request_method: req.method().as_str().to_string(),
request_path: req.path().to_string(),
headers: request_headers(&req),
payload: Some(json!({
"provider": provider,
"connectionId": path.connection_id,
"signatureHeaders": signature_headers,
})),
response: Some(persistence_payload.clone()),
};
spawn_gateway_webhook_sink_dispatch(app_state.clone(), trigger);
let response_data = billing_route_success_data(
persistence_payload,
&[
("provider", json!(provider)),
("client_name", json!(client_name.requested)),
("connection_id", json!(path.connection_id)),
("handled", json!(true)),
("route_key", json!(ROUTE_BILLING_DOCUMENT_UPSERTED)),
],
);
api_success("Billing webhook ingested", Value::Object(response_data))
}
#[get("/admin/billing/clients/{client_name}/webhook-events")]
async fn admin_list_billing_webhook_events(
req: HttpRequest,
app_state: Data<AppState>,
path: Path<BillingConnectionsClientPath>,
query: Query<BillingWebhookEventsListQuery>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let client_name = path.client_name.trim().to_string();
if client_name.is_empty() {
return bad_request("Invalid client_name", "client_name must not be empty.");
}
let pool = match pool_for_client(app_state.get_ref(), &client_name).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let filters = BillingWebhookEventListFilters {
connection_id: query.connection_id,
provider: normalize_optional_text_field(query.provider.as_deref())
.map(|provider| provider.to_ascii_lowercase()),
limit: query.limit.unwrap_or(100).clamp(1, 500),
offset: query.offset.unwrap_or(0).max(0),
};
let events = match list_billing_webhook_events(&pool, &filters).await {
Ok(events) => events,
Err(error) => {
return billing_store_error_response("Failed to list billing webhook events", error);
}
};
api_success(
"Listed billing webhook events",
json!({
"clientName": client_name,
"events": events,
"limit": filters.limit,
"offset": filters.offset
}),
)
}
#[get("/admin/billing/grants")]
async fn admin_billing_grants(req: HttpRequest) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
api_success("Listed billing grants", billing_grants_payload())
}
#[get("/admin/billing/providers")]
async fn admin_billing_providers(req: HttpRequest) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let integration = billing_providers_integration_contract(ROUTE_BILLING_DOCUMENT_UPSERTED);
api_success(
"Listed billing providers",
json!({
"providers": billing_provider_catalog(),
"webhookRouteTemplate": "/billing/providers/{provider}/clients/{client_name}/connections/{connection_id}/webhook",
"canonicalDocumentKinds": shared_billing_document_kinds(),
"reconcileResourceKinds": shared_billing_reconcile_resource_kinds(),
"integration": {
"billingGrantsEndpoint": integration.billing_grants_endpoint,
"webhookSinkHelperEndpoint": integration.webhook_sink_helper_endpoint,
"webhookSinkProvisionEndpoint": integration.webhook_sink_provision_endpoint,
"webhookRouteTrigger": integration.webhook_route_trigger,
"connectionCreateEndpoint": integration.connection_create_endpoint,
"connectionSeedSql": integration.connection_seed_sql,
"connectionSeedSqlExamples": billing_connection_seed_sql_examples(),
"connectionRequestExamples": billing_connection_request_examples(),
"seedSql": integration.seed_sql,
"tableSql": integration.table_sql,
"bootstrap": integration.bootstrap
}
}),
)
}
#[get("/admin/webhook-sinks/helpers/billing")]
async fn admin_billing_sink_helpers(
req: HttpRequest,
query: Query<BillingSinkHelperQuery>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let source_client_name = query.client_name.trim();
if source_client_name.is_empty() {
return bad_request("Invalid client_name", "client_name must not be empty.");
}
let target_client_name = query
.target_client_name
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or(source_client_name);
let target_schema = match resolved_billing_sink_target_schema(query.target_schema.as_deref()) {
Ok(value) => value,
Err(resp) => return resp,
};
let raw_query = serde_urlencoded::from_str::<HashMap<String, String>>(req.query_string())
.unwrap_or_default();
let athena_base_url = query
.athena_base_url
.as_deref()
.or(raw_query.get("athena_base_url").map(String::as_str))
.map(str::trim)
.filter(|value| !value.is_empty());
let definitions = billing_sink_definitions(
source_client_name,
target_client_name,
&target_schema,
athena_base_url,
);
let helpers = definitions
.iter()
.map(|definition| definition.helper.clone())
.collect::<Vec<BillingSinkHelperDefinition>>();
api_success(
"Listed billing webhook sink helpers",
json!({
"sourceClientName": source_client_name,
"targetClientName": target_client_name,
"targetSchema": target_schema,
"athenaBaseUrl": athena_base_url,
"route_key": ROUTE_BILLING_DOCUMENT_UPSERTED,
"table_sql": "sql/billing.sql",
"tableSql": billing_tables_sql(),
"target_table_sql_example": "sql/billing_target_schema_billing_example.sql",
"targetTableSqlExample": billing_target_schema_billing_example_sql(),
"targetTableSql": shared_render_billing_target_schema_sql(&target_schema),
"seed_sql": "sql/billing_webhook_sinks_seed.sql",
"seedSql": shared_render_billing_sink_seed_sql(
source_client_name,
target_client_name,
ROUTE_BILLING_DOCUMENT_UPSERTED,
&target_schema,
athena_base_url,
),
"integration": billing_sink_integration_contract(ROUTE_BILLING_DOCUMENT_UPSERTED),
"definitions": definitions,
"helpers": helpers
}),
)
}
#[post("/admin/billing/clients/{client_name}/webhook-sinks/provision")]
async fn admin_provision_billing_webhook_sinks(
req: HttpRequest,
app_state: Data<AppState>,
path: Path<BillingConnectionsClientPath>,
body: web::Json<ProvisionBillingSinksRequest>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let client_name = path.client_name.trim().to_string();
if client_name.is_empty() {
return bad_request("Invalid client_name", "client_name must not be empty.");
}
let catalog_pool = match pool_for_client(app_state.get_ref(), &client_name).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let target_client_name = body
.target_client_name
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or(client_name.as_str())
.to_string();
let target_schema = match resolved_billing_sink_target_schema(body.target_schema.as_deref()) {
Ok(value) => value,
Err(resp) => return resp,
};
let athena_base_url = body
.athena_base_url
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
let enabled = body.enabled.unwrap_or(true);
let definitions = shared_billing_sink_resolved_definitions(
&client_name,
&target_client_name,
ROUTE_BILLING_DOCUMENT_UPSERTED,
&target_schema,
athena_base_url,
enabled,
);
let mut sinks = Vec::<GatewayWebhookSinkRecord>::new();
for params in definitions
.iter()
.cloned()
.map(|definition| definition.create_params)
.map(
|params: BillingSinkCreateParams| CreateGatewayWebhookSinkParams {
name: params.name,
description: params.description,
enabled: params.enabled,
athena_base_url: params.athena_base_url,
client_name: params.client_name,
table_name: Some(params.table_name),
route_key: params.route_key,
target_client_name: params.target_client_name,
target_table: params.target_table,
target_write_mode: Some(params.target_write_mode),
target_conflict_columns: Some(params.target_conflict_columns),
projection_template: params.projection_template,
include_request_body_in_context: params.include_request_body_in_context,
lookup_keys: params.lookup_keys,
slug: Some(params.slug),
state_key_template: params.state_key_template,
state_cap_max_fires: params.state_cap_max_fires,
state_cap_window_seconds: params.state_cap_window_seconds,
idempotency_template: params.idempotency_template,
state_resolution_mode: params.state_resolution_mode,
state_resolution_query_builder: params.state_resolution_query_builder,
},
)
{
match upsert_gateway_webhook_sink(&catalog_pool, params).await {
Ok(record) => sinks.push(record),
Err(error) => {
return error_response_with_code(
actix_web::http::StatusCode::INTERNAL_SERVER_ERROR,
"Failed to provision billing webhook sinks",
format!("Could not save the canonical billing sink set: {error}"),
WEBHOOK_ERROR_CODE_WEBHOOK_SINK_PROVISION_FAILED,
None,
);
}
}
}
api_success(
"Provisioned billing webhook sinks",
json!({
"clientName": client_name,
"targetClientName": target_client_name,
"targetSchema": target_schema,
"route_key": ROUTE_BILLING_DOCUMENT_UPSERTED,
"table_sql": "sql/billing.sql",
"tableSql": billing_tables_sql(),
"target_table_sql_example": "sql/billing_target_schema_billing_example.sql",
"targetTableSqlExample": billing_target_schema_billing_example_sql(),
"targetTableSql": shared_render_billing_target_schema_sql(&target_schema),
"seed_sql": "sql/billing_webhook_sinks_seed.sql",
"seedSql": shared_render_billing_sink_seed_sql(
&client_name,
&target_client_name,
ROUTE_BILLING_DOCUMENT_UPSERTED,
&target_schema,
athena_base_url,
),
"integration": billing_sink_integration_contract(ROUTE_BILLING_DOCUMENT_UPSERTED),
"definitions": definitions,
"sinks": sinks
}),
)
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(admin_list_billing_connections)
.service(admin_list_billing_webhook_events)
.service(admin_create_billing_connection)
.service(admin_get_billing_connection)
.service(admin_update_billing_connection)
.service(admin_delete_billing_connection)
.service(admin_reconcile_billing_document)
.service(admin_provision_billing_webhook_sinks)
.service(provider_webhook)
.service(admin_billing_grants)
.service(admin_billing_providers)
.service(admin_billing_sink_helpers);
}
#[cfg(test)]
mod tests {
use super::{
BillingProviderResourceKind, BillingStoreError, BillingWebhookEventsListQuery,
ROUTE_BILLING_DOCUMENT_UPSERTED, ReconcileBillingDocumentRequest, admin_billing_providers,
admin_billing_sink_helpers, billing_connection_webhook_path, billing_grants_payload,
billing_right_resolution_payload, billing_route_success_data,
billing_seed_mollie_example_sql, billing_seed_stripe_example_sql,
billing_sink_create_params, billing_sink_helpers, billing_store_error_response,
billing_tables_sql, billing_target_schema_billing_example_sql,
billing_webhook_sinks_seed_sql, default_billing_credential_kind,
enforce_billing_webhook_athena_base_url, enrich_billing_connection_config,
maybe_unsigned_mollie_hook_ping_response, resolve_billing_client_name,
resolve_updated_billing_connection_config, resolved_billing_sink_target_schema,
shared_billing_document_kinds, shared_billing_reconcile_resource_kinds, signature_headers,
};
use crate::data::billing::BillingProviderConnectionRecord;
use crate::test_support::{ATHENA_TEST_ADMIN_KEY, AthAdminKeyGuard};
use actix_web::App;
use actix_web::body::to_bytes;
use actix_web::http::header::{HOST, HeaderName, HeaderValue};
use actix_web::test::{TestRequest, call_service, init_service, read_body_json};
use athena_billing::providers::billing_provider_catalog;
use athena_billing::sinks::{
DEFAULT_BILLING_SINK_SOURCE_CLIENT_NAME, DEFAULT_BILLING_SINK_TARGET_CLIENT_NAME,
DEFAULT_BILLING_SINK_TARGET_SCHEMA,
render_default_billing_sink_seed_sql as shared_render_default_billing_sink_seed_sql,
};
use chrono::Utc;
use serde_json::json;
use uuid::Uuid;
#[test]
fn billing_sink_helpers_use_billing_route_key_and_table_scopes() {
let helpers = billing_sink_helpers(
"athena-main",
"athena-replica",
"billing",
Some("https://athena.example"),
);
assert_eq!(helpers.len(), 3);
assert_eq!(
helpers[0].create_body["routeKey"].as_str(),
Some(ROUTE_BILLING_DOCUMENT_UPSERTED)
);
assert_eq!(
helpers[0].create_body["athenaBaseUrl"].as_str(),
Some("https://athena.example")
);
assert_eq!(
helpers[0].create_body["tableName"].as_str(),
Some("billing.billing_payments")
);
assert_eq!(helpers[0].document_kind, "payment");
assert_eq!(helpers[0].target_schema, "billing");
assert_eq!(helpers[0].target_table, "billing.billing_payments");
assert_eq!(
helpers[2].create_body["targetClientName"].as_str(),
Some("athena-replica")
);
assert_eq!(
helpers[2].create_body["targetSchema"].as_str(),
Some("billing")
);
assert_eq!(
helpers[0].create_body["targetWriteMode"].as_str(),
Some("upsert")
);
assert_eq!(
helpers[0].create_body["targetConflictColumns"],
json!(["provider", "provider_payment_id"])
);
assert_eq!(
helpers[0].create_body["resiliencePolicy"]["stateResolutionMode"].as_str(),
Some("template")
);
assert_eq!(
helpers[1].create_body["projectionTemplate"]["interval"].as_str(),
Some("{{ response.sql_write.row.interval }}")
);
}
#[test]
fn billing_route_success_data_preserves_artifacts_and_adds_http_envelope_fields() {
let base_payload = json!({
"rightProjection": {
"rightKeys": ["invoice.paid"]
},
"authRightSyncResolution": {
"nativeRightModules": [{
"key": "billing",
"rights": [{
"key": "invoice.paid",
"source": "billing",
"resource": "invoice",
"action": "paid",
"description": "Invoice paid"
}]
}]
}
});
let response_data = billing_route_success_data(
base_payload.clone(),
&[
("client_name", json!("athena-main")),
(
"connection_id",
json!("11111111-1111-1111-1111-111111111111"),
),
("route_key", json!(ROUTE_BILLING_DOCUMENT_UPSERTED)),
("handled", json!(true)),
],
);
assert_eq!(response_data["client_name"], json!("athena-main"));
assert_eq!(response_data["handled"], json!(true));
assert_eq!(
response_data["authRightSyncResolution"]["nativeRightModules"][0]["key"],
json!("billing")
);
assert_eq!(
response_data["artifacts"]["rightProjection"]["rightKeys"],
json!(["invoice.paid"])
);
assert_eq!(
response_data["artifacts"]["authRightSyncResolution"]["nativeRightModules"][0]["rights"]
[0]["key"],
json!("invoice.paid")
);
}
#[actix_web::test]
async fn billing_sink_helper_route_threads_optional_athena_base_url() {
let _guard = AthAdminKeyGuard::new();
let app = init_service(App::new().service(admin_billing_sink_helpers)).await;
let request = TestRequest::get()
.uri("/admin/webhook-sinks/helpers/billing?client_name=athena-main&target_client_name=athena-replica&target_schema=billing&athena_base_url=https%3A%2F%2Fathena.example")
.insert_header(("x-athena-key", ATHENA_TEST_ADMIN_KEY))
.to_request();
let response = call_service(&app, request).await;
assert_eq!(response.status(), actix_web::http::StatusCode::OK);
let json: serde_json::Value = read_body_json(response).await;
assert_eq!(
json["data"]["athenaBaseUrl"],
json!("https://athena.example")
);
assert_eq!(
json["data"]["helpers"][0]["createBody"]["athenaBaseUrl"],
json!("https://athena.example")
);
assert_eq!(
json["data"]["definitions"][0]["blueprint"]["slug"],
json!("billing-payments-canonical-mirror")
);
assert_eq!(
json["data"]["definitions"][0]["createParams"]["targetTable"],
json!("billing.billing_payments")
);
assert_eq!(
json["data"]["integration"]["billing_grants_endpoint"],
json!("/admin/billing/grants")
);
assert_eq!(
json["data"]["integration"]["athena_auth"]["preferred_sync_billing_endpoint"],
json!("/admin/rights/sync-billing")
);
let seed_sql = json["data"]["seedSql"]
.as_str()
.expect("seedSql should be present");
assert!(seed_sql.contains("'https://athena.example'"));
let target_table_sql = json["data"]["targetTableSql"]
.as_str()
.expect("targetTableSql should be present");
assert!(target_table_sql.contains("CREATE SCHEMA IF NOT EXISTS billing;"));
assert!(target_table_sql.contains("CREATE TABLE IF NOT EXISTS billing.billing_payments"));
}
#[actix_web::test]
async fn billing_sink_helper_route_accepts_camel_case_query_aliases() {
let _guard = AthAdminKeyGuard::new();
let app = init_service(App::new().service(admin_billing_sink_helpers)).await;
let request = TestRequest::get()
.uri("/admin/webhook-sinks/helpers/billing?clientName=athena-main&targetClientName=athena-replica&targetSchema=billing&athenaBaseUrl=https%3A%2F%2Fathena.example")
.insert_header(("x-athena-key", ATHENA_TEST_ADMIN_KEY))
.to_request();
let response = call_service(&app, request).await;
assert_eq!(response.status(), actix_web::http::StatusCode::OK);
let json: serde_json::Value = read_body_json(response).await;
assert_eq!(json["data"]["sourceClientName"], json!("athena-main"));
assert_eq!(json["data"]["targetClientName"], json!("athena-replica"));
assert_eq!(json["data"]["targetSchema"], json!("billing"));
assert_eq!(
json["data"]["athenaBaseUrl"],
json!("https://athena.example")
);
assert_eq!(
json["data"]["helpers"][0]["createBody"]["targetTable"],
json!("billing.billing_payments")
);
assert_eq!(
json["data"]["integration"]["webhook_route_trigger"],
json!(ROUTE_BILLING_DOCUMENT_UPSERTED)
);
}
#[test]
fn billing_provider_catalog_surfaces_runtime_status() {
let catalog = billing_provider_catalog();
assert_eq!(catalog.len(), 2);
assert_eq!(catalog[0].provider.as_str(), "mollie");
assert_eq!(catalog[0].implementation_status, "implemented");
assert_eq!(
catalog[0].supported_webhook_envelope_kinds,
vec!["classic", "next_gen"]
);
assert_eq!(
catalog[0].supported_webhook_event_types,
vec![
"payment.status_changed",
"payment-link.*",
"sales-invoice.*"
]
);
assert_eq!(catalog[1].provider.as_str(), "stripe");
assert_eq!(catalog[1].implementation_status, "implemented");
assert_eq!(
catalog[1].supported_webhook_envelope_kinds,
vec!["snapshot"]
);
assert!(
catalog[1]
.supported_webhook_event_types
.contains(&"checkout.session.completed".to_string())
);
assert!(catalog[1].supports_webhook_ingestion);
assert!(catalog[1].supports_reconcile);
assert_eq!(
catalog[0].connection_config.default_credential_kind,
"api_key"
);
assert_eq!(
catalog[1].connection_config.default_credential_kind,
"secret_key"
);
}
#[actix_web::test]
async fn billing_provider_discovery_payload_surfaces_bootstrap_metadata() {
let _guard = AthAdminKeyGuard::new();
let app = init_service(App::new().service(admin_billing_providers)).await;
let request = TestRequest::get()
.uri("/admin/billing/providers")
.insert_header(("x-athena-key", ATHENA_TEST_ADMIN_KEY))
.to_request();
let response = call_service(&app, request).await;
assert_eq!(response.status(), actix_web::http::StatusCode::OK);
let json: serde_json::Value = read_body_json(response).await;
assert_eq!(
json["data"]["canonicalDocumentKinds"],
json!(shared_billing_document_kinds())
);
assert_eq!(
json["data"]["reconcileResourceKinds"],
json!(shared_billing_reconcile_resource_kinds())
);
assert_eq!(
json["data"]["integration"]["billingGrantsEndpoint"],
"/admin/billing/grants"
);
assert_eq!(
json["data"]["integration"]["webhookSinkHelperEndpoint"],
"/admin/webhook-sinks/helpers/billing"
);
assert_eq!(
json["data"]["integration"]["webhookSinkProvisionEndpoint"],
"/admin/billing/clients/{client_name}/webhook-sinks/provision"
);
assert_eq!(
json["data"]["integration"]["connectionCreateEndpoint"],
"/admin/billing/clients/{client_name}/connections"
);
assert_eq!(
json["data"]["integration"]["webhookRouteTrigger"],
ROUTE_BILLING_DOCUMENT_UPSERTED
);
assert_eq!(
json["data"]["integration"]["connectionSeedSql"]["mollie"],
"sql/billing_seed_mollie_example.sql"
);
assert_eq!(
json["data"]["integration"]["connectionSeedSql"]["stripe"],
"sql/billing_seed_stripe_example.sql"
);
assert_eq!(
json["data"]["integration"]["connectionSeedSqlExamples"]["mollie"],
json!(billing_seed_mollie_example_sql())
);
assert_eq!(
json["data"]["integration"]["connectionSeedSqlExamples"]["stripe"],
json!(billing_seed_stripe_example_sql())
);
assert_eq!(
json["data"]["integration"]["connectionRequestExamples"]["mollie"]["provider"],
"mollie"
);
assert_eq!(
json["data"]["integration"]["connectionRequestExamples"]["stripe"]["provider"],
"stripe"
);
assert_eq!(
json["data"]["integration"]["bootstrap"]["errorCatalogEndpoint"],
"/admin/errors/catalog"
);
assert_eq!(
json["data"]["integration"]["bootstrap"]["docsBaseUrl"],
"https://docs.athena-cluster.com"
);
assert_eq!(
json["data"]["integration"]["bootstrap"]["athenaAuth"]["preferredSyncBillingEndpoint"],
"/admin/rights/sync-billing"
);
assert_eq!(
json["data"]["providers"][0]["connectionConfig"]["defaultCredentialKind"],
"api_key"
);
assert_eq!(
json["data"]["providers"][0]["connectionConfig"]["fields"][2]["key"],
"apiKey"
);
assert_eq!(
json["data"]["providers"][0]["connectionConfig"]["fields"][2]["secret"],
true
);
assert_eq!(
json["data"]["providers"][1]["connectionConfig"]["fields"][4]["key"],
"accountId"
);
assert_eq!(
json["data"]["providers"][1]["connectionConfig"]["fields"][4]["derivedFromConnectionField"],
"providerProfileId"
);
assert_eq!(
json["data"]["providers"][0]["integration"]["verification"]["verificationModes"],
json!(["classic_unsigned", "next_gen_signature"])
);
assert_eq!(
json["data"]["providers"][0]["integration"]["athenaErrors"][0]["errorNumber"],
4001
);
assert_eq!(
json["data"]["providers"][1]["integration"]["verification"]["signatureHeaderName"],
"stripe-signature"
);
assert_eq!(
json["data"]["providers"][1]["integration"]["connectionReconcileEndpoint"],
"/admin/billing/clients/{client_name}/connections/{connection_id}/reconcile"
);
}
#[test]
fn billing_grants_payload_surfaces_native_rights_and_runtime_kinds() {
let payload = billing_grants_payload();
let grants = payload["grants"]
.as_array()
.expect("grants should be an array");
let dialect_rights = payload["dialectRights"]
.as_array()
.expect("dialectRights should be an array");
let native_rights = payload["nativeRights"]
.as_array()
.expect("nativeRights should be an array");
let translated_rights = payload["translatedRights"]
.as_array()
.expect("translatedRights should be an array");
let source_kinds = payload["sourceKinds"]
.as_array()
.expect("sourceKinds should be an array");
let reconcile_resource_kinds = payload["reconcileResourceKinds"]
.as_array()
.expect("reconcileResourceKinds should be an array");
assert_eq!(
payload["providerCatalogEndpoint"],
"/admin/billing/providers"
);
assert_eq!(payload["counts"]["grants"], json!(grants.len()));
assert_eq!(
payload["counts"]["nativeRights"],
json!(native_rights.len())
);
assert_eq!(
payload["counts"]["translatedRights"],
json!(translated_rights.len())
);
assert_eq!(payload["counts"]["sourceKinds"], json!(source_kinds.len()));
assert_eq!(
payload["counts"]["reconcileResourceKinds"],
json!(reconcile_resource_kinds.len())
);
assert_eq!(dialect_rights.len(), 2);
assert_eq!(payload["dialectRights"][0]["provider"], "mollie");
assert_eq!(
payload["dialectRights"][0]["rights"][0]["canonicalKey"],
"billing.customers.read"
);
assert_eq!(payload["nativeRights"][0]["source"], "billing");
assert_eq!(payload["nativeRightModules"][0]["key"], "billing");
assert_eq!(
payload["nativeRightModules"][0]["rights"][0]["key"],
"billing.customer.manage"
);
assert!(
translated_rights.iter().any(|right| {
right["key"] == "billing.payments.read"
&& right["providers"]
.as_array()
.is_some_and(|providers| providers.len() == 2)
}),
"expected billing.payments.read to merge Mollie and Stripe bindings"
);
assert_eq!(
payload["sourceKinds"],
json!(["payment", "subscription", "invoice"])
);
assert_eq!(
payload["documentKinds"],
json!(["payment", "subscription", "invoice"])
);
assert_eq!(
payload["reconcileResourceKinds"],
json!(["payment", "payment_link", "subscription", "invoice"])
);
assert_eq!(
payload["bootstrap"]["providerCatalogEndpoint"],
"/admin/billing/providers"
);
assert_eq!(
payload["bootstrap"]["errorCatalogEndpoint"],
"/admin/errors/catalog"
);
assert_eq!(
payload["bootstrap"]["canonicalTargetTableSqlExample"],
"sql/billing_target_schema_billing_example.sql"
);
assert_eq!(
payload["bootstrap"]["athenaAuth"]["preferredSyncSourceEndpoint"],
"/admin/rights/sync-source"
);
}
#[actix_web::test]
async fn billing_store_error_response_uses_numbered_provider_specific_codes() {
let stripe_resp = billing_store_error_response(
"Billing webhook ingestion failed",
BillingStoreError::WebhookRejected {
provider: "stripe".to_string(),
details: "missing stripe-signature header".to_string(),
},
);
assert_eq!(
stripe_resp.status(),
actix_web::http::StatusCode::UNAUTHORIZED
);
let stripe_body = to_bytes(stripe_resp.into_body())
.await
.expect("stripe error body should be readable");
let stripe_json: serde_json::Value =
serde_json::from_slice(&stripe_body).expect("stripe error body should be JSON");
assert_eq!(stripe_json["code"], "STRIPE_WEBHOOK_REJECTED");
assert_eq!(stripe_json["error_number"], 4009);
assert_eq!(
stripe_json["docs_url"],
"https://docs.athena-cluster.com/4009"
);
assert_eq!(stripe_json["data"]["provider"], "stripe");
let unsupported_resp = billing_store_error_response(
"Billing webhook ingestion failed",
BillingStoreError::UnsupportedWebhook("event type is not projected".to_string()),
);
assert_eq!(
unsupported_resp.status(),
actix_web::http::StatusCode::UNPROCESSABLE_ENTITY
);
let unsupported_body = to_bytes(unsupported_resp.into_body())
.await
.expect("unsupported error body should be readable");
let unsupported_json: serde_json::Value = serde_json::from_slice(&unsupported_body)
.expect("unsupported error body should be JSON");
assert_eq!(
unsupported_json["code"],
"BILLING_WEBHOOK_PAYLOAD_UNSUPPORTED"
);
assert_eq!(unsupported_json["error_number"], 4010);
assert_eq!(
unsupported_json["docs_url"],
"https://docs.athena-cluster.com/4010"
);
}
#[test]
fn billing_sink_create_params_use_blueprint_targets() {
let params = billing_sink_create_params(
"athena-main",
"athena-replica",
"billing",
Some("https://athena.example"),
true,
);
assert_eq!(params.len(), 3);
assert_eq!(params[0].route_key, ROUTE_BILLING_DOCUMENT_UPSERTED);
assert_eq!(params[0].client_name, "athena-main");
assert_eq!(params[0].target_client_name, "athena-replica");
assert_eq!(params[0].target_table, "billing.billing_payments");
assert_eq!(params[0].target_write_mode.as_deref(), Some("upsert"));
assert_eq!(
params[0].target_conflict_columns,
Some(json!(["provider", "provider_payment_id"]))
);
assert_eq!(
params[0].athena_base_url.as_deref(),
Some("https://athena.example")
);
assert_eq!(
params[0].state_key_template.as_deref(),
Some("{{ response.upsert.provider_ref }}|{{ response.sql_write.table }}")
);
assert_eq!(params[1].target_table, "billing.billing_subscriptions");
assert_eq!(params[2].target_table, "billing.billing_invoices");
}
#[test]
fn billing_sink_target_schema_defaults_and_validation_are_explicit() {
assert_eq!(
resolved_billing_sink_target_schema(None).expect("default target schema"),
DEFAULT_BILLING_SINK_TARGET_SCHEMA
);
assert_eq!(
resolved_billing_sink_target_schema(Some("billing")).expect("custom target schema"),
"billing"
);
assert!(resolved_billing_sink_target_schema(Some("app-billing")).is_err());
}
#[test]
fn signature_header_collection_is_provider_specific() {
let request = TestRequest::default()
.insert_header(("X-Mollie-Signature", "sha256=def"))
.insert_header(("Stripe-Signature", "t=1,v1=sig"))
.to_http_request();
let signatures = signature_headers(&request, "mollie");
assert_eq!(signatures, vec!["sha256=def"]);
assert_eq!(signature_headers(&request, "stripe"), vec!["t=1,v1=sig"]);
}
#[test]
fn billing_client_name_alias_resolution_preserves_requested_value() {
let resolution =
resolve_billing_client_name("suits-formations").expect("client alias should resolve");
assert_eq!(resolution.requested, "suits-formations");
assert_eq!(resolution.registry, "suits-formations");
let unchanged =
resolve_billing_client_name("suits_formations").expect("alias should resolve");
assert_eq!(unchanged.requested, "suits_formations");
assert_eq!(unchanged.registry, "suits-formations");
}
#[actix_web::test]
async fn billing_webhook_base_url_policy_rejects_other_mirrors() {
let connection = BillingProviderConnectionRecord {
id: Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
owner_kind: "org".to_string(),
owner_id: "org_123".to_string(),
provider: "mollie".to_string(),
mode: "live".to_string(),
status: "active".to_string(),
credential_kind: "api_key".to_string(),
provider_account_id: "org_abc".to_string(),
provider_profile_id: Some("pfl_123".to_string()),
scopes: json!([]),
config: json!({}),
metadata: json!({
"webhookIngress": {
"athenaBaseUrl": "https://mirror2.athena-cluster.com"
}
}),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
let request = TestRequest::post()
.insert_header((HOST, HeaderValue::from_static("mirror3.athena-cluster.com")))
.insert_header((
HeaderName::from_static("x-forwarded-proto"),
HeaderValue::from_static("https"),
))
.to_http_request();
let response =
enforce_billing_webhook_athena_base_url(&request, &connection, "suits-formations")
.expect_err("other mirrors should be rejected");
assert_eq!(response.status(), actix_web::http::StatusCode::FORBIDDEN);
let body = to_bytes(response.into_body()).await.expect("response body");
let json: serde_json::Value = serde_json::from_slice(&body).expect("json response");
assert_eq!(json["code"], "BILLING_WEBHOOK_BASE_URL_MISMATCH");
assert_eq!(json["data"]["client_name"], "suits-formations");
}
#[test]
fn unsigned_mollie_hook_ping_override_requires_explicit_opt_in() {
let body = br#"{"resource":"event","id":"event_123","type":"hook.ping","entityId":"hook_123","createdAt":"2026-07-02T07:23:01.0Z"}"#;
let opt_in_connection = BillingProviderConnectionRecord {
id: Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
owner_kind: "org".to_string(),
owner_id: "org_123".to_string(),
provider: "mollie".to_string(),
mode: "live".to_string(),
status: "active".to_string(),
credential_kind: "api_key".to_string(),
provider_account_id: "org_abc".to_string(),
provider_profile_id: Some("pfl_123".to_string()),
scopes: json!([]),
config: json!({}),
metadata: json!({
"webhookIngress": {
"allowUnsignedHookPing": true
}
}),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
let ignored = maybe_unsigned_mollie_hook_ping_response(
"mollie",
"suits-formations",
&opt_in_connection,
body,
&[],
)
.expect("opt-in hook.ping should be ignored");
assert_eq!(ignored["handled"], json!(false));
assert_eq!(ignored["eventType"], json!("hook.ping"));
assert!(
maybe_unsigned_mollie_hook_ping_response(
"mollie",
"suits-formations",
&opt_in_connection,
body,
&["sha256=abc".to_string()],
)
.is_none()
);
let no_opt_in_connection = BillingProviderConnectionRecord {
metadata: json!({}),
..opt_in_connection
};
assert!(
maybe_unsigned_mollie_hook_ping_response(
"mollie",
"suits-formations",
&no_opt_in_connection,
body,
&[],
)
.is_none()
);
}
#[test]
fn mollie_connection_config_is_enriched_with_runtime_defaults() {
let config = enrich_billing_connection_config(
"mollie",
"test",
"advanced_access_token",
Some("pfl_123"),
json!({ "apiKey": "secret" }),
)
.expect("config should be valid");
assert_eq!(config["apiBaseUrl"], "https://api.mollie.com");
assert_eq!(config["mode"], "test");
assert_eq!(config["credentialKind"], "advanced_access_token");
assert_eq!(config["profileId"], "pfl_123");
assert_eq!(config["signingSecrets"], json!([]));
}
#[test]
fn stripe_connection_config_is_enriched_with_runtime_defaults() {
let config = enrich_billing_connection_config(
"stripe",
"test",
"secret_key",
Some("acct_123"),
json!({ "apiKey": "sk_test_123" }),
)
.expect("config should be valid");
assert_eq!(config["apiBaseUrl"], "https://api.stripe.com");
assert_eq!(config["mode"], "test");
assert_eq!(config["credentialKind"], "secret_key");
assert_eq!(config["accountId"], "acct_123");
assert_eq!(config["webhookSigningSecrets"], json!([]));
}
#[test]
fn provider_specific_default_credential_kind_matches_supported_runtime_shape() {
assert_eq!(default_billing_credential_kind("mollie"), "api_key");
assert_eq!(default_billing_credential_kind("stripe"), "secret_key");
assert_eq!(default_billing_credential_kind(" Stripe "), "secret_key");
}
#[test]
fn update_config_rebuilds_runtime_json_when_connection_contract_fields_change() {
let current = BillingProviderConnectionRecord {
id: Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
owner_kind: "org".to_string(),
owner_id: "org_123".to_string(),
provider: "stripe".to_string(),
mode: "live".to_string(),
status: "active".to_string(),
credential_kind: "secret_key".to_string(),
provider_account_id: "acct_123".to_string(),
provider_profile_id: Some("acct_123".to_string()),
scopes: json!([]),
config: json!({
"apiBaseUrl": "https://api.stripe.com",
"credentialKind": "secret_key",
"apiKey": "sk_live_123",
"mode": "live",
"accountId": "acct_123",
"webhookSigningSecrets": []
}),
metadata: json!({}),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
let updated = resolve_updated_billing_connection_config(
¤t,
Some("test"),
Some("restricted_key"),
Some(Some("acct_456")),
None,
)
.expect("config should rebuild")
.expect("config should be returned");
assert_eq!(updated["mode"], "test");
assert_eq!(updated["credentialKind"], "restricted_key");
assert_eq!(updated["accountId"], "acct_456");
assert_eq!(updated["apiKey"], "sk_live_123");
}
#[test]
fn billing_connection_webhook_path_uses_client_and_provider_tuple() {
let connection = BillingProviderConnectionRecord {
id: Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
owner_kind: "org".to_string(),
owner_id: "org_123".to_string(),
provider: "mollie".to_string(),
mode: "live".to_string(),
status: "active".to_string(),
credential_kind: "api_key".to_string(),
provider_account_id: "org_abc".to_string(),
provider_profile_id: Some("pfl_123".to_string()),
scopes: json!([]),
config: json!({}),
metadata: json!({}),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
assert_eq!(
billing_connection_webhook_path("athena-main", &connection),
"/billing/providers/mollie/clients/athena-main/connections/11111111-1111-1111-1111-111111111111/webhook"
);
}
#[test]
fn billing_webhook_event_query_defaults_can_be_deserialized() {
let query: BillingWebhookEventsListQuery =
serde_urlencoded::from_str("provider=mollie&limit=25&offset=5")
.expect("query should decode");
assert_eq!(query.provider.as_deref(), Some("mollie"));
assert_eq!(query.limit, Some(25));
assert_eq!(query.offset, Some(5));
}
#[test]
fn reconcile_billing_document_request_supports_subscription_customer_ref() {
let body: ReconcileBillingDocumentRequest = serde_json::from_value(json!({
"resourceKind": "subscription",
"providerRef": "sub_123",
"providerCustomerRef": "cst_123"
}))
.expect("request should decode");
assert!(matches!(
body.resource_kind,
BillingProviderResourceKind::Subscription
));
assert_eq!(body.provider_ref, "sub_123");
assert_eq!(body.provider_customer_ref.as_deref(), Some("cst_123"));
}
#[test]
fn billing_tables_sql_matches_provision_billing_prefix() {
let billing_sql = billing_tables_sql().trim();
let provision_sql = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/sql/provision.sql"));
assert!(
provision_sql.contains(billing_sql),
"sql/provision.sql must embed the canonical billing.sql block without drift"
);
}
#[test]
fn billing_tables_sql_allows_stripe_snapshot_webhook_envelopes() {
let billing_sql = billing_tables_sql();
assert!(
billing_sql.contains("'snapshot'"),
"sql/billing.sql must allow Stripe snapshot webhook envelope kinds"
);
assert!(
billing_sql.contains("'secret_key'"),
"sql/billing.sql must allow Stripe secret_key credential kinds"
);
assert!(
billing_sql.contains("'restricted_key'"),
"sql/billing.sql must allow Stripe restricted_key credential kinds"
);
}
#[test]
fn billing_grants_payload_surfaces_provider_specific_seed_files() {
let payload = billing_grants_payload();
assert_eq!(
payload["connections"]["seed_sql"]["mollie"],
json!("sql/billing_seed_mollie_example.sql")
);
assert_eq!(
payload["connections"]["seed_sql"]["stripe"],
json!("sql/billing_seed_stripe_example.sql")
);
assert_eq!(
payload["connections"]["seed_sql_examples"]["mollie"],
json!(billing_seed_mollie_example_sql())
);
assert_eq!(
payload["connections"]["seed_sql_examples"]["stripe"],
json!(billing_seed_stripe_example_sql())
);
assert_eq!(
payload["connections"]["request_examples"]["mollie"]["credentialKind"],
json!("advanced_access_token")
);
assert_eq!(
payload["connections"]["request_examples"]["stripe"]["credentialKind"],
json!("secret_key")
);
assert_eq!(
payload["athena_auth"]["request_shape"]["rightKeys"],
json!(["payment-link.paid"])
);
assert_eq!(
payload["athena_auth"]["sync_rights_source_endpoint"],
json!("/admin/rights/assignment/sync-source")
);
assert_eq!(
payload["athena_auth"]["preferred_sync_source_endpoint"],
json!("/admin/rights/sync-source")
);
assert_eq!(
payload["athena_auth"]["sync_rights_billing_endpoint"],
json!("/admin/rights/assignment/sync-billing")
);
assert_eq!(
payload["athena_auth"]["preferred_sync_billing_endpoint"],
json!("/admin/rights/sync-billing")
);
assert_eq!(
payload["athena_auth"]["runtime_env"]["base_url"],
json!("ATHENA_NATIVE_RIGHT_SYNC_BASE_URL")
);
assert_eq!(
payload["athena_auth"]["grant_named_runtime_env_aliases"]["path"],
json!("ATHENA_NATIVE_GRANT_SYNC_PATH")
);
assert_eq!(
payload["athena_auth"]["request_contract"]["rightKeys"],
json!(
"native Athena Auth sync keys; preferred input contract for modular rights domains"
)
);
assert_eq!(
payload["athena_auth"]["request_contract"]["nativeRightModules"],
json!(
"resolved grouped native-right module descriptors returned by Athena Auth and billing helper output surfaces"
)
);
assert_eq!(
payload["athena_auth"]["request_shape"]["rightModules"][0]["key"],
json!("billing")
);
assert_eq!(
payload["athena_auth"]["request_contract"]["projectionMetadata"],
json!(
"billing document metadata forwarded onto Athena Auth source assignments; payments can carry providerSubscriptionId when recurring provider events project through payment-first webhooks"
)
);
assert_eq!(
payload["athena_auth"]["request_shape"]["metadata"]["providerSubscriptionId"],
json!("sub_123")
);
assert_eq!(
payload["webhook_sinks"]["target_table_sql_example"],
json!("sql/billing_target_schema_billing_example.sql")
);
}
#[test]
fn billing_right_resolution_payload_distinguishes_resolved_and_unknown_keys() {
let payload = billing_right_resolution_payload(&[
"payment-link.paid".to_string(),
"unknown.billing".to_string(),
]);
assert_eq!(
payload["grantKeys"],
json!(["payment-link.paid", "unknown.billing"])
);
assert_eq!(payload["rightKeys"], json!(["payment-link.paid"]));
assert_eq!(
payload["nativeRights"][0]["key"],
json!("payment-link.paid")
);
assert_eq!(payload["nativeRightModules"][0]["key"], json!("billing"));
assert_eq!(
payload["nativeRightModules"][0]["rights"][0]["key"],
json!("payment-link.paid")
);
assert_eq!(payload["unknownRightKeys"], json!(["unknown.billing"]));
}
#[test]
fn billing_webhook_sink_seed_sql_matches_shared_renderer_defaults() {
let rendered = shared_render_default_billing_sink_seed_sql(ROUTE_BILLING_DOCUMENT_UPSERTED);
assert_eq!(
billing_webhook_sinks_seed_sql().trim(),
rendered.trim(),
"sql/billing_webhook_sinks_seed.sql must stay aligned with athena-billing shared sink defaults for {} -> {}",
DEFAULT_BILLING_SINK_SOURCE_CLIENT_NAME,
DEFAULT_BILLING_SINK_TARGET_CLIENT_NAME,
);
}
#[test]
fn billing_target_schema_example_sql_matches_shared_renderer_defaults() {
let rendered = athena_billing::sinks::render_default_billing_target_schema_sql();
assert_eq!(
billing_target_schema_billing_example_sql().trim(),
rendered.trim(),
"sql/billing_target_schema_billing_example.sql must stay aligned with the shared billing target schema renderer"
);
}
}