athena_rs 3.26.1

Hyper performant polyglot Database driver
Documentation
//! Compatibility facade for the extracted `athena-webhooks` crate.
//!
//! Existing `athena_rs` callers continue to use `crate::webhooks::*`, but the
//! durable store and generic dispatch logic now live in the workspace crate.
//!
//!

use actix_web::web::Data;
use sqlx::postgres::PgPool;

use athena_webhooks::sinks::GatewayWebhookSinkRecord;
use athena_webhooks::store::GatewayWebhookRecord;
pub use athena_webhooks::{
    GatewayWebhookEventDescriptor, GatewayWebhookTrigger, ROUTE_BILLING_DOCUMENT_UPSERTED,
    ROUTE_GATEWAY_DATA, ROUTE_GATEWAY_DELETE, ROUTE_GATEWAY_FETCH, ROUTE_GATEWAY_INSERT,
    ROUTE_GATEWAY_QUERY, ROUTE_GATEWAY_UPDATE, ROUTE_REST_DELETE, ROUTE_REST_GET, ROUTE_REST_PATCH,
    ROUTE_REST_POST, WebhookRuntime, gateway_webhook_event_catalog,
    gateway_webhook_trigger_from_http,
};

use crate::AppState;

pub mod schema;

#[derive(Clone)]
struct AthenaWebhookRuntime {
    app_state: Data<AppState>,
}

impl WebhookRuntime for AthenaWebhookRuntime {
    fn pool_for_client(&self, client_name: &str) -> Option<PgPool> {
        self.app_state.pg_registry.get_pool(client_name)
    }

    fn http_client(&self) -> &reqwest::Client {
        &self.app_state.client
    }
}

/// ## `BorrowedAthenaWebhookRuntime`
///
/// ### Properties
/// AppState that carries all the base configuration and states
/// - `app_state`: `&'a AppState`
struct BorrowedAthenaWebhookRuntime<'a> {
    app_state: &'a AppState,
}

impl WebhookRuntime for BorrowedAthenaWebhookRuntime<'_> {
    fn pool_for_client(&self, client_name: &str) -> Option<PgPool> {
        self.app_state.pg_registry.get_pool(client_name)
    }

    fn http_client(&self) -> &reqwest::Client {
        &self.app_state.client
    }
}

/// Spawns the background webhook dispatcher using the shared `AppState`.
pub fn spawn_gateway_webhook_dispatch(app_state: Data<AppState>, trigger: GatewayWebhookTrigger) {
    athena_webhooks::spawn_gateway_webhook_dispatch(AthenaWebhookRuntime { app_state }, trigger);
}

/// Spawns the background webhook sink dispatcher using the shared `AppState`.
pub fn spawn_gateway_webhook_sink_dispatch(
    app_state: Data<AppState>,
    trigger: GatewayWebhookTrigger,
) {
    athena_webhooks::spawn_gateway_webhook_sink_dispatch(
        AthenaWebhookRuntime { app_state },
        trigger,
    );
}

/// Executes one explicit webhook test delivery using the shared Athena runtime.
pub async fn dispatch_webhook_test(
    app_state: &AppState,
    catalog_pool: &PgPool,
    webhook: &GatewayWebhookRecord,
    trigger: GatewayWebhookTrigger,
) -> Result<(i64, String, Option<i32>, Option<String>), String> {
    let runtime: BorrowedAthenaWebhookRuntime<'_> = BorrowedAthenaWebhookRuntime { app_state };
    athena_webhooks::dispatch_webhook_test(&runtime, catalog_pool, webhook, trigger).await
}

/// Executes one explicit webhook sink test using the shared Athena runtime.
pub async fn dispatch_webhook_sink_test(
    app_state: &AppState,
    catalog_pool: &PgPool,
    sink: &GatewayWebhookSinkRecord,
    trigger: GatewayWebhookTrigger,
) -> Result<(i64, String, String, Option<i32>, serde_json::Value), String> {
    let runtime: BorrowedAthenaWebhookRuntime<'_> = BorrowedAthenaWebhookRuntime { app_state };
    athena_webhooks::dispatch_webhook_sink_test(&runtime, catalog_pool, sink, trigger).await
}