use anyhow::{Context, Result, anyhow};
use athena_schema_heal::{
SchemaHealAuditConfig, SchemaHealError, SchemaHealPlan, SchemaHealer, SchemaMutation,
push_standard_bigint_identity_column_mutations,
};
use serde_json::Value;
use sqlx::PgPool;
use std::sync::atomic::{AtomicBool, Ordering};
use crate::drivers::postgresql::sqlx::{insert_row, insert_row_with_schema_coercion};
const MOLLIE_SINK_NAME: &str = "mollie";
const MOLLIE_PRIMARY_SINK_TABLE: &str = "athena.webhook_sink_events";
const MOLLIE_PROJECTION_SINK_TABLE: &str = "billing.billing_projection_events";
static MOLLIE_PRIMARY_SINK_READY: AtomicBool = AtomicBool::new(false);
static MOLLIE_PROJECTION_SINK_READY: AtomicBool = AtomicBool::new(false);
pub(crate) async fn insert_row_with_sink_schema_heal(
sink_name: &str,
client_name: &str,
pool: &PgPool,
table_name: &str,
payload: &Value,
) -> Result<Value> {
let Some(known_table) = KnownWebhookSinkTable::from_sink_and_table(sink_name, table_name)
else {
return insert_row(pool, table_name, payload).await;
};
match insert_row_with_schema_coercion(pool, table_name, payload).await {
Ok(row) => Ok(row),
Err(first_error) => {
let report = ensure_known_webhook_sink_table_ready(client_name, pool, known_table)
.await
.with_context(|| {
format!(
"runtime schema self-heal failed for sink '{sink_name}' table '{table_name}'"
)
})?;
insert_row_with_schema_coercion(pool, table_name, payload)
.await
.map_err(|retry_error| {
anyhow!(
"initial insert failed: {:#}; schema self-heal plan '{}' applied {} mutation(s) and skipped {} mutation(s); retry failed: {:#}",
first_error,
report.plan_name,
report.applied_count(),
report.skipped_count(),
retry_error
)
})
}
}
}
async fn ensure_known_webhook_sink_table_ready(
client_name: &str,
pool: &PgPool,
known_table: KnownWebhookSinkTable,
) -> Result<athena_schema_heal::SchemaHealReport> {
let ready_flag = known_table.cached_ready_flag();
ready_flag.store(false, Ordering::Release);
let report = SchemaHealer::new(pool)
.with_audit_config(SchemaHealAuditConfig {
enabled: true,
actor: "athena_webhook_sink_runtime".to_string(),
})
.apply_plan(&known_table.plan(client_name))
.await
.map_err(|err| anyhow!(err.to_string()))?;
if report.applied_count() > 0 {
tracing::info!(
target: "athena::webhook_sinks",
client_name = %client_name,
table_name = %known_table.table_name(),
plan = %report.plan_name,
applied = report.applied_count(),
skipped = report.skipped_count(),
"Applied inbound webhook sink runtime schema self-heal mutations"
);
}
if !known_table
.table_ready(pool)
.await
.map_err(|err| anyhow!(err.to_string()))?
{
return Err(anyhow!(
"schema self-heal completed for '{}' but the runtime table is still not ready",
known_table.table_name()
));
}
ready_flag.store(true, Ordering::Release);
Ok(report)
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum KnownWebhookSinkTable {
MolliePrimary,
MollieProjection,
}
impl KnownWebhookSinkTable {
fn from_sink_and_table(sink_name: &str, table_name: &str) -> Option<Self> {
if !sink_name.eq_ignore_ascii_case(MOLLIE_SINK_NAME) {
return None;
}
match table_name {
MOLLIE_PRIMARY_SINK_TABLE => Some(Self::MolliePrimary),
MOLLIE_PROJECTION_SINK_TABLE => Some(Self::MollieProjection),
_ => None,
}
}
fn table_name(self) -> &'static str {
match self {
Self::MolliePrimary => MOLLIE_PRIMARY_SINK_TABLE,
Self::MollieProjection => MOLLIE_PROJECTION_SINK_TABLE,
}
}
fn cached_ready_flag(self) -> &'static AtomicBool {
match self {
Self::MolliePrimary => &MOLLIE_PRIMARY_SINK_READY,
Self::MollieProjection => &MOLLIE_PROJECTION_SINK_READY,
}
}
fn plan(self, client_name: &str) -> SchemaHealPlan {
match self {
Self::MolliePrimary => mollie_primary_sink_plan(client_name),
Self::MollieProjection => mollie_projection_sink_plan(client_name),
}
}
async fn table_ready(self, pool: &PgPool) -> Result<bool, SchemaHealError> {
match self {
Self::MolliePrimary => mollie_primary_sink_ready(pool).await,
Self::MollieProjection => mollie_projection_sink_ready(pool).await,
}
}
}
fn mollie_primary_sink_plan(client_name: &str) -> SchemaHealPlan {
let context = athena_schema_heal::SchemaHealContext::new(client_name, "athena")
.with_table_name("webhook_sink_events")
.with_object_type("inbound_webhook_sink_runtime");
let mut plan = SchemaHealPlan::new("mollie_webhook_sink_primary_runtime", context);
plan.push_mutation(SchemaMutation::ensure_extension(
"ensure_pgcrypto_extension",
"pgcrypto",
));
plan.push_mutation(SchemaMutation::ensure_schema(
"ensure_athena_schema",
"athena",
));
plan.push_mutation(SchemaMutation::ensure_table(
"ensure_webhook_sink_events_table",
"athena",
"webhook_sink_events",
r#"
CREATE TABLE IF NOT EXISTS athena.webhook_sink_events (
id bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
webhook_sink_event_id uuid NOT NULL DEFAULT gen_random_uuid() UNIQUE,
created_at timestamptz NOT NULL DEFAULT now(),
sink_name text NOT NULL,
received_at timestamptz NOT NULL,
request_path text NOT NULL,
http_method text NOT NULL,
source_ip text,
content_type text,
verified boolean NOT NULL DEFAULT false,
event_type text,
external_id text,
query_params_json jsonb NOT NULL DEFAULT '{}'::jsonb,
headers_json jsonb NOT NULL DEFAULT '{}'::jsonb,
payload_json jsonb,
payload_text text,
payload_sha256 text NOT NULL,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
CONSTRAINT webhook_sink_events_webhook_sink_event_id_unique UNIQUE (webhook_sink_event_id),
CONSTRAINT webhook_sink_events_sink_name_nonempty CHECK (btrim(sink_name) <> ''),
CONSTRAINT webhook_sink_events_request_path_nonempty CHECK (btrim(request_path) <> ''),
CONSTRAINT webhook_sink_events_http_method_nonempty CHECK (btrim(http_method) <> ''),
CONSTRAINT webhook_sink_events_payload_sha256_nonempty CHECK (btrim(payload_sha256) <> ''),
CONSTRAINT webhook_sink_events_query_params_json_object CHECK (jsonb_typeof(query_params_json) = 'object'),
CONSTRAINT webhook_sink_events_headers_json_object CHECK (jsonb_typeof(headers_json) = 'object'),
CONSTRAINT webhook_sink_events_metadata_object CHECK (jsonb_typeof(metadata) = 'object')
)
"#,
));
for (column, add_sql, expected_type, alter_sql) in [
(
"webhook_sink_event_id",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN webhook_sink_event_id uuid NOT NULL DEFAULT gen_random_uuid()",
"uuid",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN webhook_sink_event_id TYPE uuid USING webhook_sink_event_id::uuid",
),
(
"created_at",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN created_at timestamptz NOT NULL DEFAULT now()",
"timestamptz",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN created_at TYPE timestamptz USING created_at::timestamptz",
),
(
"sink_name",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN sink_name text NOT NULL DEFAULT ''",
"text",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN sink_name TYPE text USING sink_name::text",
),
(
"received_at",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN received_at timestamptz NOT NULL DEFAULT now()",
"timestamptz",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN received_at TYPE timestamptz USING received_at::timestamptz",
),
(
"request_path",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN request_path text NOT NULL DEFAULT ''",
"text",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN request_path TYPE text USING request_path::text",
),
(
"http_method",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN http_method text NOT NULL DEFAULT 'POST'",
"text",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN http_method TYPE text USING http_method::text",
),
(
"source_ip",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN source_ip text",
"text",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN source_ip TYPE text USING source_ip::text",
),
(
"content_type",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN content_type text",
"text",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN content_type TYPE text USING content_type::text",
),
(
"verified",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN verified boolean NOT NULL DEFAULT false",
"boolean",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN verified TYPE boolean USING verified::boolean",
),
(
"event_type",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN event_type text",
"text",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN event_type TYPE text USING event_type::text",
),
(
"external_id",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN external_id text",
"text",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN external_id TYPE text USING external_id::text",
),
(
"query_params_json",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN query_params_json jsonb NOT NULL DEFAULT '{}'::jsonb",
"jsonb",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN query_params_json TYPE jsonb USING query_params_json::jsonb",
),
(
"headers_json",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN headers_json jsonb NOT NULL DEFAULT '{}'::jsonb",
"jsonb",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN headers_json TYPE jsonb USING headers_json::jsonb",
),
(
"payload_json",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN payload_json jsonb",
"jsonb",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN payload_json TYPE jsonb USING payload_json::jsonb",
),
(
"payload_text",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN payload_text text",
"text",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN payload_text TYPE text USING payload_text::text",
),
(
"payload_sha256",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN payload_sha256 text NOT NULL DEFAULT ''",
"text",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN payload_sha256 TYPE text USING payload_sha256::text",
),
(
"metadata",
"ALTER TABLE athena.webhook_sink_events ADD COLUMN metadata jsonb NOT NULL DEFAULT '{}'::jsonb",
"jsonb",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN metadata TYPE jsonb USING metadata::jsonb",
),
] {
push_column_and_type_mutations(
&mut plan,
"athena",
"webhook_sink_events",
column,
add_sql,
expected_type,
alter_sql,
);
}
push_standard_bigint_identity_column_mutations(
&mut plan,
"athena",
"webhook_sink_events",
"id",
);
plan.push_mutation(SchemaMutation::ensure_column_default(
"ensure_webhook_sink_events_webhook_sink_event_id_default",
"athena",
"webhook_sink_events",
"webhook_sink_event_id",
"gen_random_uuid()",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN webhook_sink_event_id SET DEFAULT gen_random_uuid()",
));
plan.push_mutation(SchemaMutation::ensure_column_default(
"ensure_webhook_sink_events_created_at_default",
"athena",
"webhook_sink_events",
"created_at",
"now()",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN created_at SET DEFAULT now()",
));
plan.push_mutation(SchemaMutation::ensure_column_default(
"ensure_webhook_sink_events_verified_default",
"athena",
"webhook_sink_events",
"verified",
"false",
"ALTER TABLE athena.webhook_sink_events ALTER COLUMN verified SET DEFAULT false",
));
for (constraint, sql) in [
(
"webhook_sink_events_webhook_sink_event_id_unique",
"ALTER TABLE athena.webhook_sink_events ADD CONSTRAINT webhook_sink_events_webhook_sink_event_id_unique UNIQUE (webhook_sink_event_id)",
),
(
"webhook_sink_events_sink_name_nonempty",
"ALTER TABLE athena.webhook_sink_events ADD CONSTRAINT webhook_sink_events_sink_name_nonempty CHECK (btrim(sink_name) <> '')",
),
(
"webhook_sink_events_request_path_nonempty",
"ALTER TABLE athena.webhook_sink_events ADD CONSTRAINT webhook_sink_events_request_path_nonempty CHECK (btrim(request_path) <> '')",
),
(
"webhook_sink_events_http_method_nonempty",
"ALTER TABLE athena.webhook_sink_events ADD CONSTRAINT webhook_sink_events_http_method_nonempty CHECK (btrim(http_method) <> '')",
),
(
"webhook_sink_events_payload_sha256_nonempty",
"ALTER TABLE athena.webhook_sink_events ADD CONSTRAINT webhook_sink_events_payload_sha256_nonempty CHECK (btrim(payload_sha256) <> '')",
),
(
"webhook_sink_events_query_params_json_object",
"ALTER TABLE athena.webhook_sink_events ADD CONSTRAINT webhook_sink_events_query_params_json_object CHECK (jsonb_typeof(query_params_json) = 'object')",
),
(
"webhook_sink_events_headers_json_object",
"ALTER TABLE athena.webhook_sink_events ADD CONSTRAINT webhook_sink_events_headers_json_object CHECK (jsonb_typeof(headers_json) = 'object')",
),
(
"webhook_sink_events_metadata_object",
"ALTER TABLE athena.webhook_sink_events ADD CONSTRAINT webhook_sink_events_metadata_object CHECK (jsonb_typeof(metadata) = 'object')",
),
] {
plan.push_mutation(SchemaMutation::ensure_constraint(
format!("ensure_{constraint}"),
"athena",
"webhook_sink_events",
constraint,
sql,
));
}
for (index_name, sql) in [
(
"webhook_sink_events_sink_received_idx",
"CREATE INDEX IF NOT EXISTS webhook_sink_events_sink_received_idx ON athena.webhook_sink_events (sink_name, received_at DESC)",
),
(
"webhook_sink_events_event_type_idx",
"CREATE INDEX IF NOT EXISTS webhook_sink_events_event_type_idx ON athena.webhook_sink_events (event_type, created_at DESC)",
),
(
"webhook_sink_events_external_id_idx",
"CREATE INDEX IF NOT EXISTS webhook_sink_events_external_id_idx ON athena.webhook_sink_events (external_id, created_at DESC) WHERE external_id IS NOT NULL",
),
] {
plan.push_mutation(SchemaMutation::ensure_index(
format!("ensure_{index_name}"),
"athena",
"webhook_sink_events",
index_name,
sql,
));
}
plan
}
fn mollie_projection_sink_plan(client_name: &str) -> SchemaHealPlan {
let context = athena_schema_heal::SchemaHealContext::new(client_name, "billing")
.with_table_name("billing_projection_events")
.with_object_type("billing_projection_sink_runtime");
let mut plan = SchemaHealPlan::new("mollie_webhook_sink_projection_runtime", context);
plan.push_mutation(SchemaMutation::ensure_extension(
"ensure_pgcrypto_extension",
"pgcrypto",
));
plan.push_mutation(SchemaMutation::ensure_schema(
"ensure_billing_schema",
"billing",
));
plan.push_mutation(SchemaMutation::ensure_table(
"ensure_billing_projection_events_table",
"billing",
"billing_projection_events",
r#"
CREATE TABLE IF NOT EXISTS billing.billing_projection_events (
id bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
billing_projection_event_id uuid NOT NULL DEFAULT gen_random_uuid() UNIQUE,
created_at timestamptz NOT NULL DEFAULT now(),
provider text NOT NULL,
sink_name text NOT NULL,
event_type text,
external_id text,
verified boolean NOT NULL DEFAULT false,
received_at timestamptz NOT NULL,
payload_json jsonb,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
CONSTRAINT billing_projection_events_billing_projection_event_id_unique UNIQUE (billing_projection_event_id),
CONSTRAINT billing_projection_events_provider_nonempty CHECK (btrim(provider) <> ''),
CONSTRAINT billing_projection_events_sink_name_nonempty CHECK (btrim(sink_name) <> ''),
CONSTRAINT billing_projection_events_metadata_object CHECK (jsonb_typeof(metadata) = 'object')
)
"#,
));
for (column, add_sql, expected_type, alter_sql) in [
(
"billing_projection_event_id",
"ALTER TABLE billing.billing_projection_events ADD COLUMN billing_projection_event_id uuid NOT NULL DEFAULT gen_random_uuid()",
"uuid",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN billing_projection_event_id TYPE uuid USING billing_projection_event_id::uuid",
),
(
"created_at",
"ALTER TABLE billing.billing_projection_events ADD COLUMN created_at timestamptz NOT NULL DEFAULT now()",
"timestamptz",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN created_at TYPE timestamptz USING created_at::timestamptz",
),
(
"provider",
"ALTER TABLE billing.billing_projection_events ADD COLUMN provider text NOT NULL DEFAULT 'mollie'",
"text",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN provider TYPE text USING provider::text",
),
(
"sink_name",
"ALTER TABLE billing.billing_projection_events ADD COLUMN sink_name text NOT NULL DEFAULT ''",
"text",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN sink_name TYPE text USING sink_name::text",
),
(
"event_type",
"ALTER TABLE billing.billing_projection_events ADD COLUMN event_type text",
"text",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN event_type TYPE text USING event_type::text",
),
(
"external_id",
"ALTER TABLE billing.billing_projection_events ADD COLUMN external_id text",
"text",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN external_id TYPE text USING external_id::text",
),
(
"verified",
"ALTER TABLE billing.billing_projection_events ADD COLUMN verified boolean NOT NULL DEFAULT false",
"boolean",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN verified TYPE boolean USING verified::boolean",
),
(
"received_at",
"ALTER TABLE billing.billing_projection_events ADD COLUMN received_at timestamptz NOT NULL DEFAULT now()",
"timestamptz",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN received_at TYPE timestamptz USING received_at::timestamptz",
),
(
"payload_json",
"ALTER TABLE billing.billing_projection_events ADD COLUMN payload_json jsonb",
"jsonb",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN payload_json TYPE jsonb USING payload_json::jsonb",
),
(
"metadata",
"ALTER TABLE billing.billing_projection_events ADD COLUMN metadata jsonb NOT NULL DEFAULT '{}'::jsonb",
"jsonb",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN metadata TYPE jsonb USING metadata::jsonb",
),
] {
push_column_and_type_mutations(
&mut plan,
"billing",
"billing_projection_events",
column,
add_sql,
expected_type,
alter_sql,
);
}
push_standard_bigint_identity_column_mutations(
&mut plan,
"billing",
"billing_projection_events",
"id",
);
plan.push_mutation(SchemaMutation::ensure_column_default(
"ensure_billing_projection_events_billing_projection_event_id_default",
"billing",
"billing_projection_events",
"billing_projection_event_id",
"gen_random_uuid()",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN billing_projection_event_id SET DEFAULT gen_random_uuid()",
));
plan.push_mutation(SchemaMutation::ensure_column_default(
"ensure_billing_projection_events_created_at_default",
"billing",
"billing_projection_events",
"created_at",
"now()",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN created_at SET DEFAULT now()",
));
plan.push_mutation(SchemaMutation::ensure_column_default(
"ensure_billing_projection_events_verified_default",
"billing",
"billing_projection_events",
"verified",
"false",
"ALTER TABLE billing.billing_projection_events ALTER COLUMN verified SET DEFAULT false",
));
for (constraint, sql) in [
(
"billing_projection_events_billing_projection_event_id_unique",
"ALTER TABLE billing.billing_projection_events ADD CONSTRAINT billing_projection_events_billing_projection_event_id_unique UNIQUE (billing_projection_event_id)",
),
(
"billing_projection_events_provider_nonempty",
"ALTER TABLE billing.billing_projection_events ADD CONSTRAINT billing_projection_events_provider_nonempty CHECK (btrim(provider) <> '')",
),
(
"billing_projection_events_sink_name_nonempty",
"ALTER TABLE billing.billing_projection_events ADD CONSTRAINT billing_projection_events_sink_name_nonempty CHECK (btrim(sink_name) <> '')",
),
(
"billing_projection_events_metadata_object",
"ALTER TABLE billing.billing_projection_events ADD CONSTRAINT billing_projection_events_metadata_object CHECK (jsonb_typeof(metadata) = 'object')",
),
] {
plan.push_mutation(SchemaMutation::ensure_constraint(
format!("ensure_{constraint}"),
"billing",
"billing_projection_events",
constraint,
sql,
));
}
for (index_name, sql) in [
(
"billing_projection_events_provider_received_idx",
"CREATE INDEX IF NOT EXISTS billing_projection_events_provider_received_idx ON billing.billing_projection_events (provider, received_at DESC)",
),
(
"billing_projection_events_event_type_idx",
"CREATE INDEX IF NOT EXISTS billing_projection_events_event_type_idx ON billing.billing_projection_events (event_type, created_at DESC)",
),
(
"billing_projection_events_external_id_idx",
"CREATE INDEX IF NOT EXISTS billing_projection_events_external_id_idx ON billing.billing_projection_events (external_id, created_at DESC) WHERE external_id IS NOT NULL",
),
] {
plan.push_mutation(SchemaMutation::ensure_index(
format!("ensure_{index_name}"),
"billing",
"billing_projection_events",
index_name,
sql,
));
}
plan
}
fn push_column_and_type_mutations(
plan: &mut SchemaHealPlan,
schema: &str,
table: &str,
column: &str,
add_sql: &str,
expected_type: &str,
alter_sql: &str,
) {
plan.push_mutation(SchemaMutation::ensure_column(
format!("ensure_{table}_{column}_column"),
schema,
table,
column,
add_sql,
));
plan.push_mutation(SchemaMutation::ensure_column_type(
format!("ensure_{table}_{column}_type"),
schema,
table,
column,
expected_type,
alter_sql,
));
}
async fn mollie_primary_sink_ready(pool: &PgPool) -> Result<bool, SchemaHealError> {
sqlx::query_scalar::<_, bool>(
r#"
SELECT
to_regclass('athena.webhook_sink_events') IS NOT NULL
AND EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'athena'
AND table_name = 'webhook_sink_events'
AND column_name = 'sink_name'
)
AND EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'athena'
AND table_name = 'webhook_sink_events'
AND column_name = 'verified'
)
AND EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'athena'
AND table_name = 'webhook_sink_events'
AND column_name = 'payload_sha256'
)
AND EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'athena'
AND table_name = 'webhook_sink_events'
AND column_name = 'id'
AND (is_identity = 'YES' OR column_default IS NOT NULL)
)
"#,
)
.fetch_one(pool)
.await
.map_err(|err| {
SchemaHealError::new(format!(
"failed to verify mollie primary webhook sink readiness: {err}"
))
})
}
async fn mollie_projection_sink_ready(pool: &PgPool) -> Result<bool, SchemaHealError> {
sqlx::query_scalar::<_, bool>(
r#"
SELECT
to_regclass('billing.billing_projection_events') IS NOT NULL
AND EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'billing'
AND table_name = 'billing_projection_events'
AND column_name = 'provider'
)
AND EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'billing'
AND table_name = 'billing_projection_events'
AND column_name = 'sink_name'
)
AND EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'billing'
AND table_name = 'billing_projection_events'
AND column_name = 'received_at'
)
AND EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'billing'
AND table_name = 'billing_projection_events'
AND column_name = 'id'
AND (is_identity = 'YES' OR column_default IS NOT NULL)
)
"#,
)
.fetch_one(pool)
.await
.map_err(|err| {
SchemaHealError::new(format!(
"failed to verify mollie projection sink readiness: {err}"
))
})
}
#[cfg(test)]
mod tests {
use super::{KnownWebhookSinkTable, mollie_primary_sink_plan, mollie_projection_sink_plan};
#[test]
fn known_table_resolution_is_scoped_to_mollie() {
assert_eq!(
KnownWebhookSinkTable::from_sink_and_table("mollie", "athena.webhook_sink_events"),
Some(KnownWebhookSinkTable::MolliePrimary)
);
assert_eq!(
KnownWebhookSinkTable::from_sink_and_table(
"mollie",
"billing.billing_projection_events"
),
Some(KnownWebhookSinkTable::MollieProjection)
);
assert_eq!(
KnownWebhookSinkTable::from_sink_and_table("stripe", "athena.webhook_sink_events"),
None
);
}
#[test]
fn primary_plan_covers_constraints_and_indexes() {
let plan = mollie_primary_sink_plan("suits-formations");
let labels: Vec<String> = plan
.mutations
.iter()
.map(|mutation| match mutation {
athena_schema_heal::SchemaMutation::EnsureExtension(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureSchema(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureTable(mutation) => mutation.label.clone(),
athena_schema_heal::SchemaMutation::EnsureColumn(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureColumnNotNull(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureColumnDefault(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureColumnGenerated(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureColumnType(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureConstraint(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::DropConstraint(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureIndex(mutation) => mutation.label.clone(),
athena_schema_heal::SchemaMutation::DropIndex(mutation) => mutation.label.clone(),
athena_schema_heal::SchemaMutation::DropColumn(mutation) => mutation.label.clone(),
athena_schema_heal::SchemaMutation::Statement(mutation) => mutation.label.clone(),
})
.collect();
for label in [
"ensure_pgcrypto_extension",
"ensure_athena_schema",
"ensure_webhook_sink_events_table",
"ensure_webhook_sink_events_sink_name_nonempty",
"ensure_webhook_sink_events_metadata_object",
"ensure_webhook_sink_events_sink_received_idx",
] {
assert!(
labels.iter().any(|candidate| candidate == label),
"missing mutation label {label}"
);
}
}
#[test]
fn projection_plan_covers_constraints_and_indexes() {
let plan = mollie_projection_sink_plan("suits-formations");
let labels: Vec<String> = plan
.mutations
.iter()
.map(|mutation| match mutation {
athena_schema_heal::SchemaMutation::EnsureExtension(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureSchema(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureTable(mutation) => mutation.label.clone(),
athena_schema_heal::SchemaMutation::EnsureColumn(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureColumnNotNull(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureColumnDefault(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureColumnGenerated(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureColumnType(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureConstraint(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::DropConstraint(mutation) => {
mutation.label.clone()
}
athena_schema_heal::SchemaMutation::EnsureIndex(mutation) => mutation.label.clone(),
athena_schema_heal::SchemaMutation::DropIndex(mutation) => mutation.label.clone(),
athena_schema_heal::SchemaMutation::DropColumn(mutation) => mutation.label.clone(),
athena_schema_heal::SchemaMutation::Statement(mutation) => mutation.label.clone(),
})
.collect();
for label in [
"ensure_pgcrypto_extension",
"ensure_billing_schema",
"ensure_billing_projection_events_table",
"ensure_billing_projection_events_provider_nonempty",
"ensure_billing_projection_events_metadata_object",
"ensure_billing_projection_events_provider_received_idx",
] {
assert!(
labels.iter().any(|candidate| candidate == label),
"missing mutation label {label}"
);
}
}
}