athena_rs 3.26.2

Hyper performant polyglot Database driver
Documentation
//! Outbox relay worker.
//!
//! Polls `public.athena_outbox` for undelivered events, dispatches the
//! associated side effects (CDC events, webhooks), and marks each row
//! published.  Failed attempts are retried with truncated exponential
//! back-off; rows that exceed `max_attempts` are left as dead-letter
//! entries and counted in the `outbox_poisoned_total` metric.
//!
//! # Startup
//!
//! ```rust,ignore
//! // In main.rs, after build_shared_state:
//! athena_rs::workers::outbox_relay::spawn_outbox_relay_worker(app_state.clone());
//! ```
//!
//! The worker is **enabled only when `ATHENA_OUTBOX_ENABLED=true`** is set.
//! When disabled, the function returns immediately without spawning anything.
//! This lets operators roll the relay in gradually (dual-publish phase) before
//! fully switching callsites away from direct `tokio::spawn` side effects.

use std::env;
use std::time::Duration;

use actix_web::web::Data;
use serde_json::Value;
use tracing::{debug, error, info, warn};
use uuid::Uuid;

use crate::AppState;
use crate::data::events::post_event;
use crate::data::outbox::{
    OutboxEvent, count_pending_outbox_events, count_poisoned_outbox_events,
    lease_pending_outbox_events, mark_outbox_failed_attempt, mark_outbox_published,
};
use crate::data::webhooks::cleanup_webhook_state_ledger_older_than;
use crate::webhooks::{GatewayWebhookTrigger, spawn_gateway_webhook_dispatch};

// ---------------------------------------------------------------------------
// Configuration constants (overridable via env)
// ---------------------------------------------------------------------------

const ENV_OUTBOX_ENABLED: &str = "ATHENA_OUTBOX_ENABLED";
const ENV_OUTBOX_POLL_MS: &str = "ATHENA_OUTBOX_POLL_MS";
const ENV_OUTBOX_BATCH_SIZE: &str = "ATHENA_OUTBOX_BATCH_SIZE";
const ENV_OUTBOX_MAX_ATTEMPTS: &str = "ATHENA_OUTBOX_MAX_ATTEMPTS";
const ENV_WEBHOOK_LEDGER_RETENTION_SECONDS: &str = "ATHENA_WEBHOOK_LEDGER_RETENTION_SECONDS";
const ENV_WEBHOOK_LEDGER_CLEANUP_EVERY_SECONDS: &str =
    "ATHENA_WEBHOOK_LEDGER_CLEANUP_EVERY_SECONDS";

const DEFAULT_POLL_MS: u64 = 1_000;
const DEFAULT_BATCH_SIZE: i32 = 50;
const DEFAULT_MAX_ATTEMPTS: i32 = 10;
const DEFAULT_WEBHOOK_LEDGER_CLEANUP_EVERY_SECONDS: u64 = 300;

// ---------------------------------------------------------------------------
// Public entry point
// ---------------------------------------------------------------------------

/// Spawn the background outbox relay worker.
///
/// Reads `ATHENA_OUTBOX_ENABLED` to decide whether to start; silently
/// returns when the feature is disabled.  Requires a logging-DB pool
/// (`app_state.logging_client_name`) — exits immediately and logs a warning
/// when none is configured.
pub fn spawn_outbox_relay_worker(app_state: Data<AppState>) {
    let enabled: bool = env::var(ENV_OUTBOX_ENABLED)
        .map(|v| matches!(v.trim(), "1" | "true" | "TRUE" | "yes" | "YES"))
        .unwrap_or(false);

    if !enabled {
        debug!("Outbox relay worker is disabled (set ATHENA_OUTBOX_ENABLED=true to enable)");
        return;
    }

    let poll_ms: u64 = env::var(ENV_OUTBOX_POLL_MS)
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(DEFAULT_POLL_MS)
        .max(100);

    let batch_size: i32 = env::var(ENV_OUTBOX_BATCH_SIZE)
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(DEFAULT_BATCH_SIZE)
        .max(1);

    let max_attempts: i32 = env::var(ENV_OUTBOX_MAX_ATTEMPTS)
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(DEFAULT_MAX_ATTEMPTS)
        .max(1);

    let Some(logging_client) = app_state.logging_client_name.clone() else {
        warn!(
            "Outbox relay worker enabled but no logging client configured; \
             skipping (set logging_client in config.yaml)"
        );
        return;
    };

    let Some(pool) = app_state.pg_registry.get_pool(&logging_client) else {
        warn!(
            client = %logging_client,
            "Outbox relay worker: logging pool unavailable at startup; skipping"
        );
        return;
    };

    // Clone Data<AppState> (cheaply — it wraps an Arc) into the spawned task.
    let worker_state: Data<AppState> = app_state.clone();

    info!(
        poll_ms,
        batch_size, max_attempts, "Starting outbox relay worker"
    );

    let webhook_ledger_retention_seconds: i64 = env::var(ENV_WEBHOOK_LEDGER_RETENTION_SECONDS)
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(0)
        .max(0);
    let webhook_ledger_cleanup_every_seconds: u64 =
        env::var(ENV_WEBHOOK_LEDGER_CLEANUP_EVERY_SECONDS)
            .ok()
            .and_then(|v| v.parse().ok())
            .unwrap_or(DEFAULT_WEBHOOK_LEDGER_CLEANUP_EVERY_SECONDS)
            .max(30);

    tokio::spawn(async move {
        let poll_interval: Duration = Duration::from_millis(poll_ms);
        let mut last_webhook_cleanup = std::time::Instant::now();
        loop {
            match lease_pending_outbox_events(&pool, batch_size, max_attempts).await {
                Err(err) => {
                    warn!(error = %err, "Outbox relay: lease query failed; will retry");
                }
                Ok(events) => {
                    let count: usize = events.len();
                    if count > 0 {
                        debug!(count, "Outbox relay: dispatching batch");
                    }
                    for event in events {
                        dispatch_outbox_event(&worker_state, &pool, event).await;
                    }
                }
            }

            // Emit health metrics periodically.
            emit_outbox_metrics(&pool, max_attempts).await;

            if webhook_ledger_retention_seconds > 0
                && last_webhook_cleanup.elapsed()
                    >= Duration::from_secs(webhook_ledger_cleanup_every_seconds)
            {
                match cleanup_webhook_state_ledger_older_than(
                    &pool,
                    webhook_ledger_retention_seconds,
                )
                .await
                {
                    Ok(removed) => {
                        if removed > 0 {
                            info!(
                                removed,
                                retention_seconds = webhook_ledger_retention_seconds,
                                "webhook_state_ledger_cleanup_removed"
                            );
                        }
                    }
                    Err(err) => {
                        warn!(error = %err, "webhook_state_ledger_cleanup_failed");
                    }
                }
                last_webhook_cleanup = std::time::Instant::now();
            }

            tokio::time::sleep(poll_interval).await;
        }
    });
}

// ---------------------------------------------------------------------------
// Per-event dispatch
// ---------------------------------------------------------------------------

async fn dispatch_outbox_event(
    state: &Data<AppState>,
    pool: &sqlx::postgres::PgPool,
    event: OutboxEvent,
) {
    let event_id: Uuid = event.event_id;
    let result: Result<(), String> = match event.event_type.as_str() {
        "mutation.insert" | "mutation.update" | "mutation.delete" => {
            dispatch_mutation_event(state.get_ref(), &event).await
        }
        "webhook.trigger" => dispatch_webhook_event(state, &event).await,
        other => {
            warn!(
                event_id = %event_id,
                event_type = %other,
                "Outbox relay: unknown event_type; marking published to avoid infinite retry"
            );
            Ok(())
        }
    };

    match result {
        Ok(()) => {
            if let Err(err) = mark_outbox_published(pool, event_id).await {
                error!(
                    event_id = %event_id,
                    error = %err,
                    "Outbox relay: failed to mark event published (will re-lease)"
                );
            }
        }
        Err(err) => {
            warn!(
                event_id = %event_id,
                error = %err,
                attempts = event.attempts,
                "Outbox relay: dispatch failed; recording attempt"
            );
            if let Err(db_err) = mark_outbox_failed_attempt(pool, event_id, &err).await {
                error!(
                    event_id = %event_id,
                    error = %db_err,
                    "Outbox relay: failed to record failed attempt"
                );
            }
        }
    }
}

// ---------------------------------------------------------------------------
// Dispatch strategies per event_type
// ---------------------------------------------------------------------------

async fn dispatch_mutation_event(_state: &AppState, event: &OutboxEvent) -> Result<(), String> {
    let company_id: String = event
        .headers
        .get("company_id")
        .and_then(Value::as_str)
        .unwrap_or("")
        .to_string();

    if company_id.is_empty() {
        // Event has no company context — still considered successfully handled.
        debug!(
            event_id = %event.event_id,
            "Outbox relay: mutation event has no company_id in headers; skipping CDC publish"
        );
        return Ok(());
    }

    post_event(company_id, event.payload.clone()).await;
    Ok(())
}

async fn dispatch_webhook_event(state: &Data<AppState>, event: &OutboxEvent) -> Result<(), String> {
    // Re-hydrate a GatewayWebhookTrigger from the outbox payload.
    let payload: &Value = &event.payload;

    let client_name: String = event
        .headers
        .get("client_name")
        .and_then(Value::as_str)
        .unwrap_or("")
        .to_string();
    let route_key: String = payload
        .get("route_key")
        .and_then(Value::as_str)
        .unwrap_or("")
        .to_string();
    let table_name: Option<String> = payload
        .get("table_name")
        .and_then(Value::as_str)
        .map(str::to_string);
    let request_id: Option<String> = event
        .headers
        .get("request_id")
        .and_then(Value::as_str)
        .map(str::to_string);
    let request_method: String = payload
        .get("request_method")
        .and_then(Value::as_str)
        .unwrap_or("POST")
        .to_string();
    let request_path: String = payload
        .get("request_path")
        .and_then(Value::as_str)
        .unwrap_or("")
        .to_string();
    let request_payload: Option<Value> = payload.get("request_payload").cloned();
    let response_payload: Option<Value> = payload.get("response_payload").cloned();

    // Raw header pairs stored in the outbox.
    let headers: Vec<(String, String)> = payload
        .get("headers")
        .and_then(Value::as_array)
        .map(|arr| {
            arr.iter()
                .filter_map(|pair| {
                    let name: String = pair.get(0).and_then(Value::as_str)?.to_string();
                    let value: String = pair.get(1).and_then(Value::as_str)?.to_string();
                    Some((name, value))
                })
                .collect()
        })
        .unwrap_or_default();

    let trigger: GatewayWebhookTrigger = GatewayWebhookTrigger {
        client_name,
        route_key,
        table_name,
        request_id,
        request_method,
        request_path,
        headers,
        payload: request_payload,
        response: response_payload,
    };

    // spawn_gateway_webhook_dispatch expects Data<AppState> — clone the Arc cheaply.
    spawn_gateway_webhook_dispatch(state.clone(), trigger);

    Ok(())
}

// ---------------------------------------------------------------------------
// Observability
// ---------------------------------------------------------------------------

async fn emit_outbox_metrics(pool: &sqlx::postgres::PgPool, max_attempts: i32) {
    // Non-fatal — only log at debug to avoid log spam.
    match count_pending_outbox_events(pool, max_attempts).await {
        Ok(pending) => {
            debug!(pending, "outbox_pending_total");
        }
        Err(err) => {
            debug!(error = %err, "Outbox relay: pending count query failed");
        }
    }
    match count_poisoned_outbox_events(pool, max_attempts).await {
        Ok(poisoned) => {
            if poisoned > 0 {
                warn!(
                    poisoned,
                    max_attempts,
                    "outbox_poisoned_total: events have exhausted all retries and require manual inspection"
                );
            }
        }
        Err(err) => {
            debug!(error = %err, "Outbox relay: poisoned count query failed");
        }
    }
}