use athena_billing::canonical::CanonicalBillingDocument;
use athena_billing::errors::BillingAdapterError;
use athena_billing::grants::{
BillingAuthRightSyncRequest, BillingRightProjection, billing_auth_right_sync_request,
billing_right_projection_for_document,
};
use athena_billing::providers::{
BillingProviderRefetchProjection, BillingProviderRefetchRequest, BillingWebhookEventSummary,
project_refetch_for_provider, project_webhook_for_provider,
};
use athena_billing::storage::{
BillingSqlDocumentTable, BillingSqlDocumentWrite, billing_sql_write_for_document,
};
use athena_billing::{
BillingCapabilitiesDocument, BillingConnectionMode, BillingConnectionStatus, BillingProvider,
BillingProviderConnection, capabilities_from_connection,
};
use athena_billing::{BillingOwnerKind, BillingSubjectRef};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use sqlx::FromRow;
use sqlx::postgres::PgPool;
use thiserror::Error;
use uuid::Uuid;
#[derive(Debug, Error)]
pub enum BillingStoreError {
#[error("billing provider connection not found")]
NotFound,
#[error("billing provider '{0}' is not supported yet")]
UnsupportedProvider(String),
#[error("billing provider connection config is invalid: {0}")]
InvalidConfig(String),
#[error("billing webhook was rejected for provider '{provider}': {details}")]
WebhookRejected { provider: String, details: String },
#[error("billing webhook payload is not supported: {0}")]
UnsupportedWebhook(String),
#[error("billing provider response parse failed: {0}")]
Parse(String),
#[error("billing provider HTTP request failed: {0}")]
Http(String),
#[error("billing adapter failed: {0}")]
Adapter(String),
#[error("database query failed: {0}")]
Database(#[from] sqlx::Error),
}
impl From<BillingAdapterError> for BillingStoreError {
fn from(value: BillingAdapterError) -> Self {
match value {
BillingAdapterError::UnsupportedProvider(provider) => {
Self::UnsupportedProvider(provider)
}
BillingAdapterError::InvalidConfig(message) => Self::InvalidConfig(message),
BillingAdapterError::WebhookVerification(message) => {
Self::Adapter(format!("webhook verification failed: {message}"))
}
BillingAdapterError::UnsupportedWebhook(message) => Self::UnsupportedWebhook(message),
BillingAdapterError::Parse(message) => Self::Parse(message),
BillingAdapterError::Http(message) => Self::Http(message),
}
}
}
fn map_billing_adapter_error(provider: &str, error: BillingAdapterError) -> BillingStoreError {
match error {
BillingAdapterError::UnsupportedProvider(provider) => {
BillingStoreError::UnsupportedProvider(provider)
}
BillingAdapterError::InvalidConfig(message) => BillingStoreError::InvalidConfig(message),
BillingAdapterError::WebhookVerification(details) => BillingStoreError::WebhookRejected {
provider: provider.to_ascii_lowercase(),
details,
},
BillingAdapterError::UnsupportedWebhook(message) => {
BillingStoreError::UnsupportedWebhook(message)
}
BillingAdapterError::Parse(message) => BillingStoreError::Parse(message),
BillingAdapterError::Http(message) => BillingStoreError::Http(message),
}
}
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct BillingProviderConnectionRecord {
pub id: Uuid,
pub owner_kind: String,
pub owner_id: String,
pub provider: String,
pub mode: String,
pub status: String,
pub credential_kind: String,
pub provider_account_id: String,
pub provider_profile_id: Option<String>,
pub scopes: Value,
pub config: Value,
pub metadata: Value,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub deleted_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BillingDocumentUpsertOutcome {
pub table: String,
pub provider_ref: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BillingWebhookIngestionResult {
pub connection: BillingProviderConnectionRecord,
pub webhook_event: BillingWebhookEventSummary,
pub webhook_payload: Value,
pub document: CanonicalBillingDocument,
pub sql_write: BillingSqlDocumentWrite,
pub right_projection: BillingRightProjection,
pub auth_right_sync: BillingAuthRightSyncRequest,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BillingWebhookPersistenceResult {
pub event_log_id: Uuid,
pub connection: BillingProviderConnectionRecord,
pub webhook_event: BillingWebhookEventSummary,
pub webhook_payload: Value,
pub document: CanonicalBillingDocument,
pub sql_write: BillingSqlDocumentWrite,
pub right_projection: BillingRightProjection,
pub auth_right_sync: BillingAuthRightSyncRequest,
pub upsert: BillingDocumentUpsertOutcome,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BillingDocumentReconcileResult {
pub connection: BillingProviderConnectionRecord,
pub refetch: BillingProviderRefetchProjection,
pub document: CanonicalBillingDocument,
pub sql_write: BillingSqlDocumentWrite,
pub right_projection: BillingRightProjection,
pub auth_right_sync: BillingAuthRightSyncRequest,
pub upsert: BillingDocumentUpsertOutcome,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BillingRightResolutionView {
pub grant_keys: Vec<String>,
pub right_keys: Vec<String>,
pub native_rights: Vec<athena_rights::AthenaRightDescriptor>,
pub native_right_modules: Vec<athena_rights::AthenaRightModule>,
pub unknown_right_keys: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
#[serde(rename_all = "camelCase")]
pub struct BillingWebhookEventRecord {
pub id: Uuid,
pub connection_id: Uuid,
pub provider: String,
pub envelope_kind: String,
pub verification_mode: String,
pub signature_header_name: Option<String>,
pub event_id: Option<String>,
pub event_type: Option<String>,
pub resource: Option<String>,
pub entity_id: Option<String>,
pub document_kind: String,
pub provider_ref: String,
pub signature_headers: Value,
pub payload: Value,
pub summary: Value,
pub received_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateBillingProviderConnectionInput {
pub owner_kind: String,
pub owner_id: String,
pub provider: String,
pub mode: String,
pub status: String,
pub credential_kind: String,
pub provider_account_id: String,
pub provider_profile_id: Option<String>,
pub scopes: Value,
pub config: Value,
pub metadata: Value,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct UpdateBillingProviderConnectionInput {
pub owner_kind: Option<String>,
pub owner_id: Option<String>,
pub mode: Option<String>,
pub status: Option<String>,
pub credential_kind: Option<String>,
pub provider_account_id: Option<String>,
pub provider_profile_id: Option<Option<String>>,
pub scopes: Option<Value>,
pub config: Option<Value>,
pub metadata: Option<Value>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BillingProviderConnectionListFilters {
pub owner_kind: Option<String>,
pub owner_id: Option<String>,
pub provider: Option<String>,
pub mode: Option<String>,
pub include_deleted: bool,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BillingWebhookEventListFilters {
pub connection_id: Option<Uuid>,
pub provider: Option<String>,
pub limit: i64,
pub offset: i64,
}
pub async fn get_billing_provider_connection(
pool: &PgPool,
id: Uuid,
) -> Result<BillingProviderConnectionRecord, BillingStoreError> {
sqlx::query_as::<_, BillingProviderConnectionRecord>(
r#"
SELECT
id,
owner_kind,
owner_id,
provider,
mode,
status,
credential_kind,
provider_account_id,
provider_profile_id,
scopes,
config,
metadata,
created_at,
updated_at,
deleted_at
FROM billing.billing_provider_connections
WHERE id = $1
AND deleted_at IS NULL
"#,
)
.bind(id)
.fetch_optional(pool)
.await?
.ok_or(BillingStoreError::NotFound)
}
pub async fn list_billing_provider_connections(
pool: &PgPool,
filters: &BillingProviderConnectionListFilters,
) -> Result<Vec<BillingProviderConnectionRecord>, BillingStoreError> {
sqlx::query_as::<_, BillingProviderConnectionRecord>(
r#"
SELECT
id,
owner_kind,
owner_id,
provider,
mode,
status,
credential_kind,
provider_account_id,
provider_profile_id,
scopes,
config,
metadata,
created_at,
updated_at,
deleted_at
FROM billing.billing_provider_connections
WHERE ($1::text IS NULL OR owner_kind = $1)
AND ($2::text IS NULL OR owner_id = $2)
AND ($3::text IS NULL OR provider = $3)
AND ($4::text IS NULL OR mode = $4)
AND ($5::boolean OR deleted_at IS NULL)
ORDER BY updated_at DESC, created_at DESC
"#,
)
.bind(filters.owner_kind.as_deref())
.bind(filters.owner_id.as_deref())
.bind(filters.provider.as_deref())
.bind(filters.mode.as_deref())
.bind(filters.include_deleted)
.fetch_all(pool)
.await
.map_err(BillingStoreError::from)
}
pub async fn list_billing_webhook_events(
pool: &PgPool,
filters: &BillingWebhookEventListFilters,
) -> Result<Vec<BillingWebhookEventRecord>, BillingStoreError> {
sqlx::query_as::<_, BillingWebhookEventRecord>(
r#"
SELECT
id,
connection_id,
provider,
envelope_kind,
verification_mode,
signature_header_name,
event_id,
event_type,
resource,
entity_id,
document_kind,
provider_ref,
signature_headers,
payload,
summary,
received_at
FROM billing.billing_webhook_events
WHERE ($1::uuid IS NULL OR connection_id = $1)
AND ($2::text IS NULL OR provider = $2)
ORDER BY received_at DESC, id DESC
LIMIT $3
OFFSET $4
"#,
)
.bind(filters.connection_id)
.bind(filters.provider.as_deref())
.bind(filters.limit)
.bind(filters.offset)
.fetch_all(pool)
.await
.map_err(BillingStoreError::from)
}
pub async fn create_billing_provider_connection(
pool: &PgPool,
input: &CreateBillingProviderConnectionInput,
) -> Result<BillingProviderConnectionRecord, BillingStoreError> {
sqlx::query_as::<_, BillingProviderConnectionRecord>(
r#"
INSERT INTO billing.billing_provider_connections (
owner_kind,
owner_id,
provider,
mode,
status,
credential_kind,
provider_account_id,
provider_profile_id,
scopes,
config,
metadata
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
RETURNING
id,
owner_kind,
owner_id,
provider,
mode,
status,
credential_kind,
provider_account_id,
provider_profile_id,
scopes,
config,
metadata,
created_at,
updated_at,
deleted_at
"#,
)
.bind(&input.owner_kind)
.bind(&input.owner_id)
.bind(&input.provider)
.bind(&input.mode)
.bind(&input.status)
.bind(&input.credential_kind)
.bind(&input.provider_account_id)
.bind(&input.provider_profile_id)
.bind(&input.scopes)
.bind(&input.config)
.bind(&input.metadata)
.fetch_one(pool)
.await
.map_err(BillingStoreError::from)
}
pub async fn update_billing_provider_connection(
pool: &PgPool,
connection_id: Uuid,
input: &UpdateBillingProviderConnectionInput,
) -> Result<BillingProviderConnectionRecord, BillingStoreError> {
sqlx::query_as::<_, BillingProviderConnectionRecord>(
r#"
UPDATE billing.billing_provider_connections
SET
owner_kind = COALESCE($2, owner_kind),
owner_id = COALESCE($3, owner_id),
mode = COALESCE($4, mode),
status = COALESCE($5, status),
credential_kind = COALESCE($6, credential_kind),
provider_account_id = COALESCE($7, provider_account_id),
provider_profile_id = CASE
WHEN $8 THEN $9
ELSE provider_profile_id
END,
scopes = COALESCE($10, scopes),
config = COALESCE($11, config),
metadata = COALESCE($12, metadata),
updated_at = now()
WHERE id = $1
AND deleted_at IS NULL
RETURNING
id,
owner_kind,
owner_id,
provider,
mode,
status,
credential_kind,
provider_account_id,
provider_profile_id,
scopes,
config,
metadata,
created_at,
updated_at,
deleted_at
"#,
)
.bind(connection_id)
.bind(input.owner_kind.as_deref())
.bind(input.owner_id.as_deref())
.bind(input.mode.as_deref())
.bind(input.status.as_deref())
.bind(input.credential_kind.as_deref())
.bind(input.provider_account_id.as_deref())
.bind(input.provider_profile_id.is_some())
.bind(input.provider_profile_id.clone().flatten())
.bind(input.scopes.as_ref())
.bind(input.config.as_ref())
.bind(input.metadata.as_ref())
.fetch_optional(pool)
.await?
.ok_or(BillingStoreError::NotFound)
}
pub async fn delete_billing_provider_connection(
pool: &PgPool,
connection_id: Uuid,
) -> Result<BillingProviderConnectionRecord, BillingStoreError> {
sqlx::query_as::<_, BillingProviderConnectionRecord>(
r#"
UPDATE billing.billing_provider_connections
SET
deleted_at = now(),
updated_at = now()
WHERE id = $1
AND deleted_at IS NULL
RETURNING
id,
owner_kind,
owner_id,
provider,
mode,
status,
credential_kind,
provider_account_id,
provider_profile_id,
scopes,
config,
metadata,
created_at,
updated_at,
deleted_at
"#,
)
.bind(connection_id)
.fetch_optional(pool)
.await?
.ok_or(BillingStoreError::NotFound)
}
pub async fn ingest_billing_webhook_for_connection(
pool: &PgPool,
connection_id: Uuid,
body: &[u8],
signature_headers: &[String],
) -> Result<Option<BillingWebhookIngestionResult>, BillingStoreError> {
let connection = get_billing_provider_connection(pool, connection_id).await?;
let Some(_) = BillingProvider::parse(&connection.provider) else {
return Err(BillingStoreError::UnsupportedProvider(
connection.provider.clone(),
));
};
ingest_supported_provider_webhook_for_connection(connection, body, signature_headers).await
}
async fn ingest_supported_provider_webhook_for_connection(
connection: BillingProviderConnectionRecord,
body: &[u8],
signature_headers: &[String],
) -> Result<Option<BillingWebhookIngestionResult>, BillingStoreError> {
let projection = project_webhook_for_provider(
&connection.provider,
&connection.config,
body,
signature_headers,
)
.await
.map_err(|error| map_billing_adapter_error(&connection.provider, error))?;
let Some(document) = projection.document else {
return Ok(None);
};
let sql_write = billing_sql_write_for_document(&document);
let right_projection = billing_right_projection_for_document(&document);
let auth_right_sync = billing_auth_right_sync_request(
&billing_subject_ref_for_connection(&connection)?,
&right_projection,
Some(&connection.metadata),
);
Ok(Some(BillingWebhookIngestionResult {
connection,
webhook_event: projection.event,
webhook_payload: projection.payload,
document,
sql_write,
right_projection,
auth_right_sync,
}))
}
pub async fn upsert_billing_document(
pool: &PgPool,
document: &CanonicalBillingDocument,
) -> Result<BillingDocumentUpsertOutcome, BillingStoreError> {
let write = billing_sql_write_for_document(document);
upsert_billing_sql_write(pool, &write).await
}
pub async fn ingest_and_upsert_billing_webhook_for_connection(
pool: &PgPool,
connection_id: Uuid,
body: &[u8],
signature_headers: &[String],
) -> Result<Option<BillingWebhookPersistenceResult>, BillingStoreError> {
let Some(ingestion) =
ingest_billing_webhook_for_connection(pool, connection_id, body, signature_headers).await?
else {
return Ok(None);
};
let upsert = upsert_billing_sql_write(pool, &ingestion.sql_write).await?;
let event_log_id = record_billing_webhook_event(
pool,
ingestion.connection.id,
&ingestion.webhook_event,
&ingestion.webhook_payload,
&ingestion.document,
&ingestion.sql_write,
signature_headers,
)
.await?;
Ok(Some(BillingWebhookPersistenceResult {
event_log_id,
connection: ingestion.connection,
webhook_event: ingestion.webhook_event,
webhook_payload: ingestion.webhook_payload,
document: ingestion.document,
sql_write: ingestion.sql_write,
right_projection: ingestion.right_projection,
auth_right_sync: ingestion.auth_right_sync,
upsert,
}))
}
pub async fn reconcile_billing_document_for_connection(
pool: &PgPool,
connection_id: Uuid,
request: &BillingProviderRefetchRequest,
) -> Result<BillingDocumentReconcileResult, BillingStoreError> {
let connection = get_billing_provider_connection(pool, connection_id).await?;
let refetch = project_refetch_for_provider(&connection.provider, &connection.config, request)
.await
.map_err(|error| map_billing_adapter_error(&connection.provider, error))?;
let document = refetch.document.clone();
let sql_write = billing_sql_write_for_document(&document);
let right_projection = billing_right_projection_for_document(&document);
let auth_right_sync = billing_auth_right_sync_request(
&billing_subject_ref_for_connection(&connection)?,
&right_projection,
Some(&connection.metadata),
);
let upsert = upsert_billing_sql_write(pool, &sql_write).await?;
Ok(BillingDocumentReconcileResult {
connection,
refetch,
document,
sql_write,
right_projection,
auth_right_sync,
upsert,
})
}
fn billing_subject_ref_for_connection(
connection: &BillingProviderConnectionRecord,
) -> Result<BillingSubjectRef, BillingStoreError> {
let owner_kind = match connection.owner_kind.trim().to_ascii_lowercase().as_str() {
"user" => BillingOwnerKind::User,
"org" => BillingOwnerKind::Org,
"workspace" => BillingOwnerKind::Workspace,
"tenant" => BillingOwnerKind::Tenant,
other => {
return Err(BillingStoreError::InvalidConfig(format!(
"unsupported billing connection owner_kind '{other}'"
)));
}
};
Ok(BillingSubjectRef {
owner_kind,
owner_id: connection.owner_id.clone(),
})
}
pub fn billing_capabilities_for_record(
connection: &BillingProviderConnectionRecord,
) -> Result<BillingCapabilitiesDocument, BillingStoreError> {
let provider = BillingProvider::parse(&connection.provider)
.ok_or_else(|| BillingStoreError::UnsupportedProvider(connection.provider.clone()))?;
let mode = BillingConnectionMode::parse(&connection.mode).ok_or_else(|| {
BillingStoreError::InvalidConfig(format!(
"unsupported billing connection mode '{}'",
connection.mode
))
})?;
let status = BillingConnectionStatus::parse(&connection.status).ok_or_else(|| {
BillingStoreError::InvalidConfig(format!(
"unsupported billing connection status '{}'",
connection.status
))
})?;
let scopes = match &connection.scopes {
Value::Array(values) => values
.iter()
.filter_map(|value| value.as_str().map(ToString::to_string))
.collect(),
other => {
return Err(BillingStoreError::InvalidConfig(format!(
"billing connection scopes must be a JSON array, got {other}"
)));
}
};
Ok(capabilities_from_connection(&BillingProviderConnection {
id: connection.id,
provider,
provider_account_id: connection.provider_account_id.clone(),
provider_profile_id: connection.provider_profile_id.clone(),
mode,
status,
scopes,
config: connection.config.clone(),
metadata: connection.metadata.clone(),
created_at: connection.created_at,
updated_at: connection.updated_at,
deleted_at: connection.deleted_at,
}))
}
pub fn billing_right_resolution_view(keys: &[String]) -> BillingRightResolutionView {
let native_right_catalog = athena_rights::billing_right_catalog();
let selection = athena_rights::resolve_right_selection(
keys.iter().map(String::as_str),
&native_right_catalog,
);
let module_selection = athena_rights::resolve_right_module_selection(
keys.iter().map(String::as_str),
&native_right_catalog,
);
BillingRightResolutionView {
grant_keys: selection.requested_keys,
right_keys: selection.resolved_keys,
native_rights: selection.native_rights,
native_right_modules: module_selection.modules,
unknown_right_keys: selection.unknown_keys,
}
}
fn billing_dispatch_artifacts_payload<S: Serialize>(
document: &CanonicalBillingDocument,
sql_write: &BillingSqlDocumentWrite,
right_projection: &BillingRightProjection,
auth_right_sync: &BillingAuthRightSyncRequest,
auth_sync_result: &S,
upsert: &BillingDocumentUpsertOutcome,
) -> Value {
json!({
"document": document,
"sql_write": sql_write,
"rightProjection": right_projection,
"rightProjectionResolution": billing_right_resolution_view(&right_projection.right_keys),
"authRightSync": auth_right_sync,
"authRightSyncResolution": billing_right_resolution_view(&auth_right_sync.right_keys),
"grant_projection": right_projection,
"grant_projection_rights": billing_right_resolution_view(&right_projection.grant_keys),
"auth_grant_sync": auth_right_sync,
"auth_grant_sync_rights": billing_right_resolution_view(&auth_right_sync.grant_keys),
"auth_sync_result": auth_sync_result,
"upsert": upsert
})
}
pub fn billing_webhook_response_payload<S: Serialize>(
persistence: &BillingWebhookPersistenceResult,
auth_sync_result: &S,
) -> Value {
let mut payload = serde_json::Map::new();
payload.insert("eventLogId".to_string(), json!(persistence.event_log_id));
payload.insert("webhookEvent".to_string(), json!(persistence.webhook_event));
payload.insert(
"webhookPayload".to_string(),
json!(persistence.webhook_payload),
);
payload.insert("connection".to_string(), json!(persistence.connection));
let Value::Object(artifacts) = billing_dispatch_artifacts_payload(
&persistence.document,
&persistence.sql_write,
&persistence.right_projection,
&persistence.auth_right_sync,
auth_sync_result,
&persistence.upsert,
) else {
return Value::Object(payload);
};
payload.extend(artifacts);
Value::Object(payload)
}
pub fn billing_reconcile_response_payload<S: Serialize>(
reconcile: &BillingDocumentReconcileResult,
auth_sync_result: &S,
) -> Value {
let mut payload = serde_json::Map::new();
payload.insert("connection".to_string(), json!(reconcile.connection));
payload.insert("refetch".to_string(), json!(reconcile.refetch));
let Value::Object(artifacts) = billing_dispatch_artifacts_payload(
&reconcile.document,
&reconcile.sql_write,
&reconcile.right_projection,
&reconcile.auth_right_sync,
auth_sync_result,
&reconcile.upsert,
) else {
return Value::Object(payload);
};
payload.extend(artifacts);
Value::Object(payload)
}
pub async fn upsert_billing_sql_write(
pool: &PgPool,
write: &BillingSqlDocumentWrite,
) -> Result<BillingDocumentUpsertOutcome, BillingStoreError> {
match write.table {
BillingSqlDocumentTable::Payments => {
let row = &write.row;
sqlx::query(
r#"
INSERT INTO billing.billing_payments (
provider,
provider_payment_id,
provider_customer_id,
provider_profile_id,
provider_payment_link_id,
status,
amount_currency,
amount_value,
amount_refunded_currency,
amount_refunded_value,
description,
metadata,
raw,
paid_at,
created_at
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8::numeric, $9, $10::numeric, $11, $12, $13, $14, $15
)
ON CONFLICT (provider, provider_payment_id)
DO UPDATE SET
provider_customer_id = EXCLUDED.provider_customer_id,
provider_profile_id = EXCLUDED.provider_profile_id,
provider_payment_link_id = EXCLUDED.provider_payment_link_id,
status = EXCLUDED.status,
amount_currency = EXCLUDED.amount_currency,
amount_value = EXCLUDED.amount_value,
amount_refunded_currency = EXCLUDED.amount_refunded_currency,
amount_refunded_value = EXCLUDED.amount_refunded_value,
description = EXCLUDED.description,
metadata = EXCLUDED.metadata,
raw = EXCLUDED.raw,
paid_at = EXCLUDED.paid_at,
created_at = EXCLUDED.created_at,
ingested_at = now()
"#,
)
.bind(row["provider"].as_str())
.bind(row["provider_payment_id"].as_str())
.bind(row["provider_customer_id"].as_str())
.bind(row["provider_profile_id"].as_str())
.bind(row["provider_payment_link_id"].as_str())
.bind(row["status"].as_str())
.bind(row["amount_currency"].as_str())
.bind(row["amount_value"].as_str())
.bind(row["amount_refunded_currency"].as_str())
.bind(row["amount_refunded_value"].as_str())
.bind(row["description"].as_str())
.bind(&row["metadata"])
.bind(&row["raw"])
.bind(row["paid_at"].as_str())
.bind(row["created_at"].as_str())
.execute(pool)
.await?;
}
BillingSqlDocumentTable::Subscriptions => {
let row = &write.row;
sqlx::query(
r#"
INSERT INTO billing.billing_subscriptions (
provider,
provider_subscription_id,
provider_customer_id,
provider_profile_id,
status,
amount_currency,
amount_value,
interval,
description,
metadata,
raw,
next_payment_date,
created_at,
canceled_at
)
VALUES (
$1, $2, $3, $4, $5, $6, $7::numeric, $8, $9, $10, $11, $12, $13, $14
)
ON CONFLICT (provider, provider_subscription_id)
DO UPDATE SET
provider_customer_id = EXCLUDED.provider_customer_id,
provider_profile_id = EXCLUDED.provider_profile_id,
status = EXCLUDED.status,
amount_currency = EXCLUDED.amount_currency,
amount_value = EXCLUDED.amount_value,
interval = EXCLUDED.interval,
description = EXCLUDED.description,
metadata = EXCLUDED.metadata,
raw = EXCLUDED.raw,
next_payment_date = EXCLUDED.next_payment_date,
created_at = EXCLUDED.created_at,
canceled_at = EXCLUDED.canceled_at,
ingested_at = now()
"#,
)
.bind(row["provider"].as_str())
.bind(row["provider_subscription_id"].as_str())
.bind(row["provider_customer_id"].as_str())
.bind(row["provider_profile_id"].as_str())
.bind(row["status"].as_str())
.bind(row["amount_currency"].as_str())
.bind(row["amount_value"].as_str())
.bind(row["interval"].as_str())
.bind(row["description"].as_str())
.bind(&row["metadata"])
.bind(&row["raw"])
.bind(row["next_payment_date"].as_str())
.bind(row["created_at"].as_str())
.bind(row["canceled_at"].as_str())
.execute(pool)
.await?;
}
BillingSqlDocumentTable::Invoices => {
let row = &write.row;
sqlx::query(
r#"
INSERT INTO billing.billing_invoices (
provider,
provider_invoice_id,
provider_profile_id,
provider_customer_id,
status,
amount_currency,
amount_value,
amount_paid_currency,
amount_paid_value,
description,
metadata,
raw,
issued_at,
paid_at,
created_at
)
VALUES (
$1, $2, $3, $4, $5, $6, $7::numeric, $8, $9::numeric, $10, $11, $12, $13, $14, $15
)
ON CONFLICT (provider, provider_invoice_id)
DO UPDATE SET
provider_profile_id = EXCLUDED.provider_profile_id,
provider_customer_id = EXCLUDED.provider_customer_id,
status = EXCLUDED.status,
amount_currency = EXCLUDED.amount_currency,
amount_value = EXCLUDED.amount_value,
amount_paid_currency = EXCLUDED.amount_paid_currency,
amount_paid_value = EXCLUDED.amount_paid_value,
description = EXCLUDED.description,
metadata = EXCLUDED.metadata,
raw = EXCLUDED.raw,
issued_at = EXCLUDED.issued_at,
paid_at = EXCLUDED.paid_at,
created_at = EXCLUDED.created_at,
ingested_at = now()
"#,
)
.bind(row["provider"].as_str())
.bind(row["provider_invoice_id"].as_str())
.bind(row["provider_profile_id"].as_str())
.bind(row["provider_customer_id"].as_str())
.bind(row["status"].as_str())
.bind(row["amount_currency"].as_str())
.bind(row["amount_value"].as_str())
.bind(row["amount_paid_currency"].as_str())
.bind(row["amount_paid_value"].as_str())
.bind(row["description"].as_str())
.bind(&row["metadata"])
.bind(&row["raw"])
.bind(row["issued_at"].as_str())
.bind(row["paid_at"].as_str())
.bind(row["created_at"].as_str())
.execute(pool)
.await?;
}
}
Ok(BillingDocumentUpsertOutcome {
table: write.table.table_name().to_string(),
provider_ref: write.provider_ref.clone(),
})
}
fn billing_document_kind(document: &CanonicalBillingDocument) -> &'static str {
match document {
CanonicalBillingDocument::Payment(_) => "payment",
CanonicalBillingDocument::Subscription(_) => "subscription",
CanonicalBillingDocument::Invoice(_) => "invoice",
}
}
async fn record_billing_webhook_event(
pool: &PgPool,
connection_id: Uuid,
webhook_event: &BillingWebhookEventSummary,
webhook_payload: &Value,
document: &CanonicalBillingDocument,
sql_write: &BillingSqlDocumentWrite,
signature_headers: &[String],
) -> Result<Uuid, BillingStoreError> {
sqlx::query_scalar(
r#"
INSERT INTO billing.billing_webhook_events (
connection_id,
provider,
envelope_kind,
verification_mode,
signature_header_name,
event_id,
event_type,
resource,
entity_id,
document_kind,
provider_ref,
signature_headers,
payload,
summary
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
)
RETURNING id
"#,
)
.bind(connection_id)
.bind(webhook_event.provider.as_str())
.bind(&webhook_event.envelope_kind)
.bind(&webhook_event.verification_mode)
.bind(webhook_event.signature_header_name.as_deref())
.bind(webhook_event.event_id.as_deref())
.bind(webhook_event.event_type.as_deref())
.bind(webhook_event.resource.as_deref())
.bind(webhook_event.entity_id.as_deref())
.bind(billing_document_kind(document))
.bind(&sql_write.provider_ref)
.bind(serde_json::json!(signature_headers))
.bind(webhook_payload)
.bind(
serde_json::to_value(webhook_event)
.map_err(|error| BillingStoreError::InvalidConfig(error.to_string()))?,
)
.fetch_one(pool)
.await
.map_err(BillingStoreError::from)
}
#[cfg(test)]
mod tests {
use super::{
BillingDocumentReconcileResult, BillingDocumentUpsertOutcome,
BillingProviderConnectionRecord, BillingStoreError, BillingWebhookEventListFilters,
BillingWebhookEventSummary, BillingWebhookPersistenceResult,
billing_capabilities_for_record, billing_document_kind, billing_reconcile_response_payload,
billing_right_resolution_view, billing_subject_ref_for_connection,
billing_webhook_response_payload, ingest_billing_webhook_for_connection,
};
use athena_billing::BillingConnectionMode;
use athena_billing::canonical::{CanonicalBillingDocument, CanonicalInvoice, CanonicalMoney};
use athena_billing::grants::{
billing_auth_right_sync_request, billing_right_projection_for_document,
};
use athena_billing::providers::mollie::{MollieConnectionConfig, MollieCredentialKind};
use athena_billing::providers::{
BillingProviderRefetchProjection, BillingProviderResourceKind,
};
use athena_billing::storage::billing_sql_write_for_document;
use chrono::Utc;
use serde_json::json;
use sqlx::postgres::PgPoolOptions;
use uuid::Uuid;
#[test]
fn billing_store_errors_preserve_invalid_config_details() {
let error = BillingStoreError::from(
athena_billing::errors::BillingAdapterError::InvalidConfig("bad config".to_string()),
);
assert!(
matches!(error, BillingStoreError::InvalidConfig(message) if message == "bad config")
);
}
#[test]
fn billing_store_errors_preserve_parse_and_http_categories() {
let parse = BillingStoreError::from(athena_billing::errors::BillingAdapterError::Parse(
"bad payload".to_string(),
));
assert!(matches!(parse, BillingStoreError::Parse(message) if message == "bad payload"));
let http = BillingStoreError::from(athena_billing::errors::BillingAdapterError::Http(
"gateway timeout".to_string(),
));
assert!(matches!(http, BillingStoreError::Http(message) if message == "gateway timeout"));
}
#[test]
fn mollie_connection_config_round_trips_from_json() {
let config = MollieConnectionConfig {
api_base_url: "https://api.mollie.com".to_string(),
credential_kind: MollieCredentialKind::OrganizationAccessToken,
api_key: "token".to_string(),
mode: BillingConnectionMode::Test,
profile_id: Some("pfl_123".to_string()),
signing_secrets: vec!["secret".to_string()],
};
let decoded: MollieConnectionConfig =
serde_json::from_value(json!(config.clone())).expect("config should deserialize");
assert_eq!(decoded.api_base_url, config.api_base_url);
assert_eq!(decoded.profile_id, config.profile_id);
}
#[test]
fn billing_right_resolution_view_preserves_grant_aliases_and_unknown_keys() {
let resolution = billing_right_resolution_view(&[
"payment-link.paid".to_string(),
"unknown.billing".to_string(),
]);
assert_eq!(
resolution.grant_keys,
vec![
"payment-link.paid".to_string(),
"unknown.billing".to_string()
]
);
assert_eq!(resolution.right_keys, vec!["payment-link.paid".to_string()]);
assert_eq!(resolution.native_rights[0].key, "payment-link.paid");
assert_eq!(
resolution.unknown_right_keys,
vec!["unknown.billing".to_string()]
);
}
#[test]
fn billing_response_payload_builders_surface_shared_artifacts_contract() {
let connection = BillingProviderConnectionRecord {
id: Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
owner_kind: "workspace".to_string(),
owner_id: "ws_123".to_string(),
provider: "mollie".to_string(),
mode: "test".to_string(),
status: "active".to_string(),
credential_kind: "api_key".to_string(),
provider_account_id: "org_123".to_string(),
provider_profile_id: Some("pfl_123".to_string()),
scopes: json!(["payments.read"]),
config: json!({}),
metadata: json!({"environment": "test"}),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
let document = CanonicalBillingDocument::Invoice(CanonicalInvoice {
provider: athena_billing::BillingProvider::Mollie,
provider_invoice_id: "si_123".to_string(),
provider_profile_id: Some("pfl_123".to_string()),
provider_customer_id: Some("cst_123".to_string()),
status: athena_billing::AthenaInvoiceStatus::Paid,
amount: Some(CanonicalMoney {
currency: "EUR".to_string(),
value: "42.00".to_string(),
}),
amount_paid: Some(CanonicalMoney {
currency: "EUR".to_string(),
value: "42.00".to_string(),
}),
description: Some("Invoice".to_string()),
metadata: json!({}),
raw: json!({"id": "si_123"}),
issued_at: None,
paid_at: None,
created_at: None,
});
let sql_write = billing_sql_write_for_document(&document);
let right_projection = billing_right_projection_for_document(&document);
let auth_right_sync = billing_auth_right_sync_request(
&super::billing_subject_ref_for_connection(&connection).expect("billing subject"),
&right_projection,
Some(&connection.metadata),
);
let auth_sync_result = json!({"status": "not_configured"});
let upsert = BillingDocumentUpsertOutcome {
table: sql_write.table.table_name().to_string(),
provider_ref: sql_write.provider_ref.clone(),
};
let webhook = BillingWebhookPersistenceResult {
event_log_id: Uuid::parse_str("22222222-2222-2222-2222-222222222222").unwrap(),
connection: connection.clone(),
webhook_event: BillingWebhookEventSummary {
provider: athena_billing::BillingProvider::Mollie,
envelope_kind: "next_gen".to_string(),
verification_mode: "next_gen_signature".to_string(),
signature_header_name: Some("x-mollie-signature".to_string()),
resource: Some("event".to_string()),
event_id: Some("event_123".to_string()),
event_type: Some("sales-invoice.paid".to_string()),
entity_id: Some("si_123".to_string()),
created_at: None,
},
webhook_payload: json!({"id": "event_123"}),
document: document.clone(),
sql_write: sql_write.clone(),
right_projection: right_projection.clone(),
auth_right_sync: auth_right_sync.clone(),
upsert: upsert.clone(),
};
let reconcile = BillingDocumentReconcileResult {
connection,
refetch: BillingProviderRefetchProjection {
resource_kind: BillingProviderResourceKind::Invoice,
provider_ref: "si_123".to_string(),
payload: json!({"id": "si_123"}),
document,
},
document: webhook.document.clone(),
sql_write,
right_projection,
auth_right_sync,
upsert,
};
let webhook_payload = billing_webhook_response_payload(&webhook, &auth_sync_result);
assert_eq!(webhook_payload["eventLogId"], json!(webhook.event_log_id));
assert_eq!(
webhook_payload["rightProjectionResolution"]["rightKeys"],
json!(["invoice.paid"])
);
assert_eq!(
webhook_payload["rightProjectionResolution"]["nativeRightModules"][0]["key"],
json!("billing")
);
assert_eq!(
webhook_payload["authRightSyncResolution"]["nativeRights"][0]["key"],
json!("invoice.paid")
);
assert_eq!(
webhook_payload["authRightSync"]["rightModules"][0]["key"],
json!("billing")
);
let reconcile_payload = billing_reconcile_response_payload(&reconcile, &auth_sync_result);
assert_eq!(
reconcile_payload["refetch"]["resourceKind"],
json!("invoice")
);
assert_eq!(
reconcile_payload["rightProjectionResolution"]["grantKeys"],
json!(["invoice.paid"])
);
assert_eq!(
reconcile_payload["authRightSyncResolution"]["rightKeys"],
json!(["invoice.paid"])
);
}
#[tokio::test]
async fn unsupported_provider_is_reported_before_adapter_execution() {
let pool = PgPoolOptions::new()
.max_connections(1)
.connect_lazy("postgres://postgres:postgres@localhost/postgres")
.expect("lazy pool");
let result =
ingest_billing_webhook_for_connection(&pool, Uuid::nil(), br#"{}"#, &Vec::new()).await;
assert!(matches!(
result,
Err(BillingStoreError::Database(_) | BillingStoreError::NotFound)
));
}
#[test]
fn connection_subject_ref_maps_supported_owner_kinds() {
let subject = billing_subject_ref_for_connection(&BillingProviderConnectionRecord {
id: Uuid::new_v4(),
owner_kind: "workspace".to_string(),
owner_id: "ws_123".to_string(),
provider: "mollie".to_string(),
mode: "test".to_string(),
status: "active".to_string(),
credential_kind: "organization_access_token".to_string(),
provider_account_id: "org_123".to_string(),
provider_profile_id: Some("pfl_123".to_string()),
scopes: json!([]),
config: json!({}),
metadata: json!({}),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
})
.expect("workspace owner kind should map");
assert_eq!(subject.owner_id, "ws_123");
assert!(matches!(
subject.owner_kind,
athena_billing::BillingOwnerKind::Workspace
));
}
#[test]
fn capabilities_are_derived_from_connection_record_scopes() {
let capabilities = billing_capabilities_for_record(&BillingProviderConnectionRecord {
id: Uuid::new_v4(),
owner_kind: "org".to_string(),
owner_id: "org_123".to_string(),
provider: "mollie".to_string(),
mode: "live".to_string(),
status: "active".to_string(),
credential_kind: "organization_access_token".to_string(),
provider_account_id: "org_abc".to_string(),
provider_profile_id: Some("pfl_123".to_string()),
scopes: json!(["payments.read", "payments.write", "profiles.write"]),
config: json!({}),
metadata: json!({}),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
})
.expect("capabilities should derive");
assert!(capabilities.connected);
assert!(capabilities.features.payments);
assert!(
capabilities
.claims
.iter()
.any(|claim| claim == "billing.read")
);
assert!(
capabilities
.claims
.iter()
.any(|claim| claim == "billing.provider.admin")
);
}
#[test]
fn billing_document_kind_maps_canonical_variants() {
let invoice = CanonicalBillingDocument::Invoice(CanonicalInvoice {
provider: athena_billing::BillingProvider::Mollie,
provider_invoice_id: "inv_123".to_string(),
provider_profile_id: None,
provider_customer_id: None,
status: athena_billing::AthenaInvoiceStatus::Paid,
amount: Some(CanonicalMoney {
currency: "EUR".to_string(),
value: "10.00".to_string(),
}),
amount_paid: None,
description: None,
metadata: json!({}),
issued_at: None,
paid_at: None,
created_at: None,
raw: json!({}),
});
assert_eq!(billing_document_kind(&invoice), "invoice");
}
#[test]
fn webhook_event_list_filters_support_uuid_and_pagination() {
let filters = BillingWebhookEventListFilters {
connection_id: Some(Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap()),
provider: Some("mollie".to_string()),
limit: 50,
offset: 10,
};
assert_eq!(filters.provider.as_deref(), Some("mollie"));
assert_eq!(filters.limit, 50);
assert_eq!(filters.offset, 10);
assert!(filters.connection_id.is_some());
}
}