use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sqlx::FromRow;
use sqlx::postgres::PgPool;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum WebhookStoreError {
#[error("database error: {0}")]
Database(#[from] sqlx::Error),
#[error("webhook not found")]
NotFound,
}
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct GatewayWebhookRecord {
pub id: i64,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub enabled: bool,
pub name: String,
pub description: Option<String>,
pub athena_base_url: Option<String>,
pub client_name: String,
pub table_name: Option<String>,
pub route_key: String,
pub http_method: String,
pub url_template: String,
pub headers_templates: Value,
pub body_template: Option<String>,
pub timeout_ms: i32,
pub include_request_body_in_context: bool,
pub lookup_keys: Value,
pub slug: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct GatewayWebhookDeliveryRecord {
pub id: i64,
pub created_at: DateTime<Utc>,
pub webhook_id: i64,
pub trigger_route_key: String,
pub request_id: Option<String>,
pub idempotency_key: String,
pub status: String,
pub http_status: Option<i32>,
pub response_headers_json: Option<Value>,
pub response_body_snippet: Option<String>,
pub error_message: Option<String>,
pub duration_ms: Option<i64>,
pub resolved_url: Option<String>,
pub context_snapshot: Value,
}
#[derive(Debug, Clone)]
pub struct CreateGatewayWebhookParams {
pub name: String,
pub description: Option<String>,
pub enabled: bool,
pub athena_base_url: Option<String>,
pub client_name: String,
pub table_name: Option<String>,
pub route_key: String,
pub http_method: String,
pub url_template: String,
pub headers_templates: Value,
pub body_template: Option<String>,
pub timeout_ms: i32,
pub include_request_body_in_context: bool,
pub lookup_keys: Value,
pub slug: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct PatchGatewayWebhookParams {
pub name: Option<String>,
pub description: Option<Option<String>>,
pub enabled: Option<bool>,
pub athena_base_url: Option<Option<String>>,
pub client_name: Option<String>,
pub table_name: Option<Option<String>>,
pub route_key: Option<String>,
pub http_method: Option<String>,
pub url_template: Option<String>,
pub headers_templates: Option<Value>,
pub body_template: Option<Option<String>>,
pub timeout_ms: Option<i32>,
pub include_request_body_in_context: Option<bool>,
pub lookup_keys: Option<Value>,
pub slug: Option<Option<String>>,
}
pub async fn list_webhooks_for_dispatch(
pool: &PgPool,
client_name: &str,
route_key: &str,
table_from_event: Option<&str>,
instance_base_url: Option<&str>,
) -> Result<Vec<GatewayWebhookRecord>, WebhookStoreError> {
let rows: Vec<GatewayWebhookRecord> = sqlx::query_as::<_, GatewayWebhookRecord>(
r#"
SELECT
id, created_at, updated_at, enabled, name, description, athena_base_url,
client_name, table_name, route_key, http_method, url_template,
headers_templates, body_template, timeout_ms, include_request_body_in_context,
lookup_keys, slug
FROM gateway_webhook
WHERE enabled = true
AND client_name = $1
AND route_key = $2
AND (
table_name IS NULL
OR ($3::text IS NOT NULL AND table_name = $3)
)
AND (
athena_base_url IS NULL
OR (
$4::text IS NOT NULL
AND lower(rtrim(athena_base_url, '/')) = lower(rtrim($4::text, '/'))
)
)
ORDER BY id ASC
"#,
)
.bind(client_name)
.bind(route_key)
.bind(table_from_event)
.bind(instance_base_url)
.fetch_all(pool)
.await?;
Ok(rows)
}
pub async fn list_gateway_webhooks(
pool: &PgPool,
limit: i64,
offset: i64,
) -> Result<Vec<GatewayWebhookRecord>, WebhookStoreError> {
list_gateway_webhooks_filtered(pool, None, limit, offset).await
}
pub async fn list_gateway_webhooks_filtered(
pool: &PgPool,
client_name: Option<&str>,
limit: i64,
offset: i64,
) -> Result<Vec<GatewayWebhookRecord>, WebhookStoreError> {
let rows: Vec<GatewayWebhookRecord> = if let Some(cn) = client_name.filter(|s| !s.is_empty()) {
sqlx::query_as::<_, GatewayWebhookRecord>(
r#"
SELECT
id, created_at, updated_at, enabled, name, description, athena_base_url,
client_name, table_name, route_key, http_method, url_template,
headers_templates, body_template, timeout_ms, include_request_body_in_context,
lookup_keys, slug
FROM gateway_webhook
WHERE client_name = $1
ORDER BY id DESC
LIMIT $2 OFFSET $3
"#,
)
.bind(cn)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?
} else {
sqlx::query_as::<_, GatewayWebhookRecord>(
r#"
SELECT
id, created_at, updated_at, enabled, name, description, athena_base_url,
client_name, table_name, route_key, http_method, url_template,
headers_templates, body_template, timeout_ms, include_request_body_in_context,
lookup_keys, slug
FROM gateway_webhook
ORDER BY id DESC
LIMIT $1 OFFSET $2
"#,
)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?
};
Ok(rows)
}
pub async fn get_gateway_webhook(
pool: &PgPool,
id: i64,
) -> Result<GatewayWebhookRecord, WebhookStoreError> {
sqlx::query_as::<_, GatewayWebhookRecord>(
r#"
SELECT
id, created_at, updated_at, enabled, name, description, athena_base_url,
client_name, table_name, route_key, http_method, url_template,
headers_templates, body_template, timeout_ms, include_request_body_in_context,
lookup_keys, slug
FROM gateway_webhook
WHERE id = $1
"#,
)
.bind(id)
.fetch_optional(pool)
.await?
.ok_or(WebhookStoreError::NotFound)
}
pub async fn upsert_gateway_webhook(
pool: &PgPool,
params: CreateGatewayWebhookParams,
) -> Result<GatewayWebhookRecord, WebhookStoreError> {
let record: GatewayWebhookRecord = sqlx::query_as::<_, GatewayWebhookRecord>(
r#"
INSERT INTO gateway_webhook (
enabled, name, description, athena_base_url, client_name, table_name,
route_key, http_method, url_template, headers_templates, body_template,
timeout_ms, include_request_body_in_context, lookup_keys, slug
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
ON CONFLICT (
client_name, route_key, http_method, url_template,
athena_base_url_norm, table_name_norm
)
DO UPDATE SET
updated_at = now(),
enabled = EXCLUDED.enabled,
name = EXCLUDED.name,
description = EXCLUDED.description,
athena_base_url = EXCLUDED.athena_base_url,
table_name = EXCLUDED.table_name,
headers_templates = EXCLUDED.headers_templates,
body_template = EXCLUDED.body_template,
timeout_ms = EXCLUDED.timeout_ms,
include_request_body_in_context = EXCLUDED.include_request_body_in_context,
lookup_keys = EXCLUDED.lookup_keys,
slug = COALESCE(EXCLUDED.slug, gateway_webhook.slug)
RETURNING
id, created_at, updated_at, enabled, name, description, athena_base_url,
client_name, table_name, route_key, http_method, url_template,
headers_templates, body_template, timeout_ms, include_request_body_in_context,
lookup_keys, slug
"#,
)
.bind(params.enabled)
.bind(¶ms.name)
.bind(¶ms.description)
.bind(¶ms.athena_base_url)
.bind(¶ms.client_name)
.bind(¶ms.table_name)
.bind(¶ms.route_key)
.bind(¶ms.http_method)
.bind(¶ms.url_template)
.bind(¶ms.headers_templates)
.bind(¶ms.body_template)
.bind(params.timeout_ms)
.bind(params.include_request_body_in_context)
.bind(¶ms.lookup_keys)
.bind(¶ms.slug)
.fetch_one(pool)
.await?;
Ok(record)
}
pub async fn patch_gateway_webhook(
pool: &PgPool,
id: i64,
patch: PatchGatewayWebhookParams,
) -> Result<GatewayWebhookRecord, WebhookStoreError> {
let existing: GatewayWebhookRecord = get_gateway_webhook(pool, id).await?;
let name: String = patch.name.unwrap_or(existing.name);
let description: Option<String> = match &patch.description {
Some(d) => d.clone(),
None => existing.description.clone(),
};
let enabled: bool = patch.enabled.unwrap_or(existing.enabled);
let athena_base_url: Option<String> = match &patch.athena_base_url {
Some(u) => u.clone(),
None => existing.athena_base_url.clone(),
};
let client_name: String = patch.client_name.unwrap_or(existing.client_name);
let table_name: Option<String> = match &patch.table_name {
Some(t) => t.clone(),
None => existing.table_name.clone(),
};
let route_key: String = patch.route_key.unwrap_or(existing.route_key);
let http_method: String = patch.http_method.unwrap_or(existing.http_method);
let url_template: String = patch.url_template.unwrap_or(existing.url_template);
let headers_templates: Value = patch
.headers_templates
.unwrap_or(existing.headers_templates.clone());
let body_template: Option<String> = match &patch.body_template {
Some(b) => b.clone(),
None => existing.body_template.clone(),
};
let timeout_ms: i32 = patch.timeout_ms.unwrap_or(existing.timeout_ms);
let include_request_body_in_context: bool = patch
.include_request_body_in_context
.unwrap_or(existing.include_request_body_in_context);
let lookup_keys: Value = patch.lookup_keys.unwrap_or(existing.lookup_keys.clone());
let slug: Option<String> = match &patch.slug {
Some(s) => s.clone(),
None => existing.slug.clone(),
};
sqlx::query(
r#"
UPDATE gateway_webhook SET
updated_at = now(),
name = $1,
description = $2,
enabled = $3,
athena_base_url = $4,
client_name = $5,
table_name = $6,
route_key = $7,
http_method = $8,
url_template = $9,
headers_templates = $10,
body_template = $11,
timeout_ms = $12,
include_request_body_in_context = $13,
lookup_keys = $14,
slug = $15
WHERE id = $16
"#,
)
.bind(&name)
.bind(&description)
.bind(enabled)
.bind(&athena_base_url)
.bind(&client_name)
.bind(&table_name)
.bind(&route_key)
.bind(&http_method)
.bind(&url_template)
.bind(&headers_templates)
.bind(&body_template)
.bind(timeout_ms)
.bind(include_request_body_in_context)
.bind(&lookup_keys)
.bind(&slug)
.bind(id)
.execute(pool)
.await?;
get_gateway_webhook(pool, id).await
}
pub async fn delete_gateway_webhook(pool: &PgPool, id: i64) -> Result<bool, WebhookStoreError> {
let r: sqlx::postgres::PgQueryResult = sqlx::query("DELETE FROM gateway_webhook WHERE id = $1")
.bind(id)
.execute(pool)
.await?;
Ok(r.rows_affected() > 0)
}
pub async fn insert_webhook_delivery_pending(
pool: &PgPool,
webhook_id: i64,
trigger_route_key: &str,
request_id: Option<&str>,
idempotency_key: &str,
context_snapshot: Value,
) -> Result<Option<i64>, WebhookStoreError> {
let row: Option<(i64,)> = sqlx::query_as(
r#"
INSERT INTO gateway_webhook_delivery (
webhook_id, trigger_route_key, request_id, idempotency_key,
status, context_snapshot
)
VALUES ($1, $2, $3, $4, 'pending', $5)
ON CONFLICT (idempotency_key) DO NOTHING
RETURNING id
"#,
)
.bind(webhook_id)
.bind(trigger_route_key)
.bind(request_id)
.bind(idempotency_key)
.bind(&context_snapshot)
.fetch_optional(pool)
.await?;
Ok(row.map(|t| t.0))
}
pub async fn update_webhook_delivery_outcome(
pool: &PgPool,
delivery_id: i64,
status: &str,
http_status: Option<i32>,
response_headers_json: Option<Value>,
response_body_snippet: Option<&str>,
error_message: Option<&str>,
duration_ms: Option<i64>,
resolved_url: Option<&str>,
) -> Result<(), WebhookStoreError> {
sqlx::query(
r#"
UPDATE gateway_webhook_delivery SET
status = $1,
http_status = $2,
response_headers_json = $3,
response_body_snippet = $4,
error_message = $5,
duration_ms = $6,
resolved_url = $7
WHERE id = $8
"#,
)
.bind(status)
.bind(http_status)
.bind(response_headers_json)
.bind(response_body_snippet)
.bind(error_message)
.bind(duration_ms)
.bind(resolved_url)
.bind(delivery_id)
.execute(pool)
.await?;
Ok(())
}
pub async fn list_webhook_deliveries(
pool: &PgPool,
webhook_id: i64,
limit: i64,
offset: i64,
) -> Result<Vec<GatewayWebhookDeliveryRecord>, WebhookStoreError> {
let rows: Vec<GatewayWebhookDeliveryRecord> =
sqlx::query_as::<_, GatewayWebhookDeliveryRecord>(
r#"
SELECT
id, created_at, webhook_id, trigger_route_key, request_id, idempotency_key,
status, http_status, response_headers_json, response_body_snippet,
error_message, duration_ms, resolved_url, context_snapshot
FROM gateway_webhook_delivery
WHERE webhook_id = $1
ORDER BY id DESC
LIMIT $2 OFFSET $3
"#,
)
.bind(webhook_id)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?;
Ok(rows)
}
pub async fn count_gateway_webhooks(pool: &PgPool) -> Result<i64, WebhookStoreError> {
count_gateway_webhooks_filtered(pool, None).await
}
pub async fn count_gateway_webhooks_filtered(
pool: &PgPool,
client_name: Option<&str>,
) -> Result<i64, WebhookStoreError> {
let c: (i64,) = if let Some(cn) = client_name.filter(|s| !s.is_empty()) {
sqlx::query_as("SELECT COUNT(*)::bigint FROM gateway_webhook WHERE client_name = $1")
.bind(cn)
.fetch_optional(pool)
.await?
} else {
sqlx::query_as("SELECT COUNT(*)::bigint FROM gateway_webhook")
.fetch_optional(pool)
.await?
}
.unwrap_or((0,));
Ok(c.0)
}
pub async fn count_webhook_deliveries(
pool: &PgPool,
webhook_id: i64,
) -> Result<i64, WebhookStoreError> {
let c: (i64,) = sqlx::query_as(
"SELECT COUNT(*)::bigint FROM gateway_webhook_delivery WHERE webhook_id = $1",
)
.bind(webhook_id)
.fetch_optional(pool)
.await?
.unwrap_or((0,));
Ok(c.0)
}