use actix_web::web::Data;
use sqlx::postgres::PgPool;
use athena_webhooks::sinks::GatewayWebhookSinkRecord;
use athena_webhooks::store::GatewayWebhookRecord;
pub use athena_webhooks::{
GatewayWebhookEventDescriptor, GatewayWebhookTrigger, ROUTE_BILLING_DOCUMENT_UPSERTED,
ROUTE_GATEWAY_DATA, ROUTE_GATEWAY_DELETE, ROUTE_GATEWAY_FETCH, ROUTE_GATEWAY_INSERT,
ROUTE_GATEWAY_QUERY, ROUTE_GATEWAY_UPDATE, ROUTE_REST_DELETE, ROUTE_REST_GET, ROUTE_REST_PATCH,
ROUTE_REST_POST, WebhookRuntime, gateway_webhook_event_catalog,
gateway_webhook_trigger_from_http,
};
use crate::AppState;
pub mod schema;
#[derive(Clone)]
struct AthenaWebhookRuntime {
app_state: Data<AppState>,
}
impl WebhookRuntime for AthenaWebhookRuntime {
fn pool_for_client(&self, client_name: &str) -> Option<PgPool> {
self.app_state.pg_registry.get_pool(client_name)
}
fn http_client(&self) -> &reqwest::Client {
&self.app_state.client
}
}
struct BorrowedAthenaWebhookRuntime<'a> {
app_state: &'a AppState,
}
impl WebhookRuntime for BorrowedAthenaWebhookRuntime<'_> {
fn pool_for_client(&self, client_name: &str) -> Option<PgPool> {
self.app_state.pg_registry.get_pool(client_name)
}
fn http_client(&self) -> &reqwest::Client {
&self.app_state.client
}
}
pub fn spawn_gateway_webhook_dispatch(app_state: Data<AppState>, trigger: GatewayWebhookTrigger) {
athena_webhooks::spawn_gateway_webhook_dispatch(AthenaWebhookRuntime { app_state }, trigger);
}
pub fn spawn_gateway_webhook_sink_dispatch(
app_state: Data<AppState>,
trigger: GatewayWebhookTrigger,
) {
athena_webhooks::spawn_gateway_webhook_sink_dispatch(
AthenaWebhookRuntime { app_state },
trigger,
);
}
pub async fn dispatch_webhook_test(
app_state: &AppState,
catalog_pool: &PgPool,
webhook: &GatewayWebhookRecord,
trigger: GatewayWebhookTrigger,
) -> Result<(i64, String, Option<i32>, Option<String>), String> {
let runtime: BorrowedAthenaWebhookRuntime<'_> = BorrowedAthenaWebhookRuntime { app_state };
athena_webhooks::dispatch_webhook_test(&runtime, catalog_pool, webhook, trigger).await
}
pub async fn dispatch_webhook_sink_test(
app_state: &AppState,
catalog_pool: &PgPool,
sink: &GatewayWebhookSinkRecord,
trigger: GatewayWebhookTrigger,
) -> Result<(i64, String, String, Option<i32>, serde_json::Value), String> {
let runtime: BorrowedAthenaWebhookRuntime<'_> = BorrowedAthenaWebhookRuntime { app_state };
athena_webhooks::dispatch_webhook_sink_test(&runtime, catalog_pool, sink, trigger).await
}