athena_rs 3.26.3

Hyper performant polyglot Database driver
//! Transactional outbox data-access layer.
//!
//! Rows in `public.athena_outbox` are written atomically inside the same
//! database transaction as the source state mutation.  A background relay
//! worker (see `src/workers/outbox_relay.rs`) leases pending rows, dispatches
//! the associated side effect, and marks them published.
//!
//! # Usage pattern
//!
//! ```text
//! let mut tx = pool.begin().await?;
//! // … your primary state mutation …
//! outbox::insert_outbox_event_tx(&mut tx, OutboxEventInsert { … }).await?;
//! tx.commit().await?; // event_id is durable only if commit succeeds
//! ```

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sqlx::postgres::{PgPool, PgRow};
use sqlx::{Postgres, Row, Transaction};
use uuid::Uuid;

// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------

/// A row as read from `public.athena_outbox`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutboxEvent {
    pub event_id: Uuid,
    pub aggregate_type: String,
    pub aggregate_id: String,
    pub event_type: String,
    pub payload: Value,
    pub headers: Value,
    pub created_at: DateTime<Utc>,
    pub available_at: DateTime<Utc>,
    pub attempts: i32,
    pub last_error: Option<String>,
    pub published_at: Option<DateTime<Utc>>,
}

/// Parameters for inserting a new outbox event inside an open transaction.
#[derive(Debug, Clone)]
pub struct OutboxEventInsert {
    /// Broad subsystem owning the mutation: `"gateway"`, `"management"`,
    /// `"deferred_write"`, etc.
    pub aggregate_type: String,
    /// Primary key / routing key of the mutated entity (table name, entity id …).
    pub aggregate_id: String,
    /// Fine-grained event discriminant: `"mutation.insert"`, `"mutation.update"`,
    /// `"mutation.delete"`, `"ddl.applied"`, `"webhook.trigger"`.
    pub event_type: String,
    /// Serialised event body (request/response snapshot, DDL statement, …).
    pub payload: Value,
    /// Correlation metadata (request_id, client_name, company_id, …).
    pub headers: Value,
    /// Earliest instant the relay worker may process this event.
    /// Defaults to `Utc::now()` (immediate).
    pub available_at: Option<DateTime<Utc>>,
}

// ---------------------------------------------------------------------------
// Row mapper
// ---------------------------------------------------------------------------

fn map_outbox_row(row: &PgRow) -> Result<OutboxEvent, sqlx::Error> {
    Ok(OutboxEvent {
        event_id: row.try_get("event_id")?,
        aggregate_type: row.try_get("aggregate_type")?,
        aggregate_id: row.try_get("aggregate_id")?,
        event_type: row.try_get("event_type")?,
        payload: row.try_get("payload")?,
        headers: row.try_get("headers")?,
        created_at: row.try_get("created_at")?,
        available_at: row.try_get("available_at")?,
        attempts: row.try_get("attempts")?,
        last_error: row.try_get("last_error")?,
        published_at: row.try_get("published_at")?,
    })
}

// ---------------------------------------------------------------------------
// Write path (called inside the source-mutation transaction)
// ---------------------------------------------------------------------------

/// Insert one outbox event **inside an existing transaction**.
///
/// This is the only correct way to enqueue an outbox event – calling it
/// outside a transaction loses the atomicity guarantee.
pub async fn insert_outbox_event_tx(
    tx: &mut Transaction<'_, Postgres>,
    params: OutboxEventInsert,
) -> Result<Uuid, sqlx::Error> {
    let available_at = params.available_at.unwrap_or_else(Utc::now);
    let row = sqlx::query(
        r#"
        INSERT INTO public.athena_outbox
            (aggregate_type, aggregate_id, event_type, payload, headers, available_at)
        VALUES ($1, $2, $3, $4, $5, $6)
        RETURNING event_id
        "#,
    )
    .bind(&params.aggregate_type)
    .bind(&params.aggregate_id)
    .bind(&params.event_type)
    .bind(&params.payload)
    .bind(&params.headers)
    .bind(available_at)
    .fetch_one(&mut **tx)
    .await?;

    row.try_get("event_id")
}

// ---------------------------------------------------------------------------
// Relay worker read path
// ---------------------------------------------------------------------------

/// Lease up to `batch_size` pending outbox events for processing.
///
/// Uses `FOR UPDATE SKIP LOCKED` so multiple relay worker instances can
/// run concurrently without row-level conflicts.  Leased rows are
/// immediately incremented by one attempt so a crashed worker does not
/// replay indefinitely without an explicit re-lease.
pub async fn lease_pending_outbox_events(
    pool: &PgPool,
    batch_size: i32,
    max_attempts: i32,
) -> Result<Vec<OutboxEvent>, sqlx::Error> {
    let rows = sqlx::query(
        r#"
        UPDATE public.athena_outbox
        SET    attempts = attempts + 1
        WHERE  event_id IN (
            SELECT event_id
            FROM   public.athena_outbox
            WHERE  published_at IS NULL
              AND  available_at  <= now()
              AND  attempts      <  $2
            ORDER BY available_at
            LIMIT $1
            FOR UPDATE SKIP LOCKED
        )
        RETURNING
            event_id,
            aggregate_type,
            aggregate_id,
            event_type,
            payload,
            headers,
            created_at,
            available_at,
            attempts,
            last_error,
            published_at
        "#,
    )
    .bind(batch_size)
    .bind(max_attempts)
    .fetch_all(pool)
    .await?;

    rows.iter().map(map_outbox_row).collect()
}

// ---------------------------------------------------------------------------
// Relay worker write path
// ---------------------------------------------------------------------------

/// Mark an outbox event as successfully published.
pub async fn mark_outbox_published(pool: &PgPool, event_id: Uuid) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        UPDATE public.athena_outbox
        SET published_at = now()
        WHERE event_id = $1
          AND published_at IS NULL
        "#,
    )
    .bind(event_id)
    .execute(pool)
    .await?;
    Ok(())
}

/// Record a failed dispatch attempt and schedule the next retry with
/// truncated exponential back-off (2^attempts seconds, capped at 10 min).
pub async fn mark_outbox_failed_attempt(
    pool: &PgPool,
    event_id: Uuid,
    error: &str,
) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
        UPDATE public.athena_outbox
        SET last_error   = $2,
            available_at = now() + (least(power(2, attempts), 600) * interval '1 second')
        WHERE event_id    = $1
          AND published_at IS NULL
        "#,
    )
    .bind(event_id)
    .bind(error)
    .execute(pool)
    .await?;
    Ok(())
}

// ---------------------------------------------------------------------------
// Observability helpers
// ---------------------------------------------------------------------------

/// Count pending (undelivered) events that have not exceeded `max_attempts`.
/// Used by the relay worker's health/metrics endpoint.
pub async fn count_pending_outbox_events(
    pool: &PgPool,
    max_attempts: i32,
) -> Result<i64, sqlx::Error> {
    let row = sqlx::query(
        r#"
        SELECT COUNT(*) AS cnt
        FROM   public.athena_outbox
        WHERE  published_at IS NULL
          AND  attempts < $1
        "#,
    )
    .bind(max_attempts)
    .fetch_one(pool)
    .await?;
    row.try_get("cnt")
}

/// Count events that have exhausted all retries (dead-letter candidates).
pub async fn count_poisoned_outbox_events(
    pool: &PgPool,
    max_attempts: i32,
) -> Result<i64, sqlx::Error> {
    let row = sqlx::query(
        r#"
        SELECT COUNT(*) AS cnt
        FROM   public.athena_outbox
        WHERE  published_at IS NULL
          AND  attempts >= $1
        "#,
    )
    .bind(max_attempts)
    .fetch_one(pool)
    .await?;
    row.try_get("cnt")
}