use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sqlx::postgres::{PgPool, PgRow};
use sqlx::{Postgres, Row, Transaction};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutboxEvent {
pub event_id: Uuid,
pub aggregate_type: String,
pub aggregate_id: String,
pub event_type: String,
pub payload: Value,
pub headers: Value,
pub created_at: DateTime<Utc>,
pub available_at: DateTime<Utc>,
pub attempts: i32,
pub last_error: Option<String>,
pub published_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone)]
pub struct OutboxEventInsert {
pub aggregate_type: String,
pub aggregate_id: String,
pub event_type: String,
pub payload: Value,
pub headers: Value,
pub available_at: Option<DateTime<Utc>>,
}
fn map_outbox_row(row: &PgRow) -> Result<OutboxEvent, sqlx::Error> {
Ok(OutboxEvent {
event_id: row.try_get("event_id")?,
aggregate_type: row.try_get("aggregate_type")?,
aggregate_id: row.try_get("aggregate_id")?,
event_type: row.try_get("event_type")?,
payload: row.try_get("payload")?,
headers: row.try_get("headers")?,
created_at: row.try_get("created_at")?,
available_at: row.try_get("available_at")?,
attempts: row.try_get("attempts")?,
last_error: row.try_get("last_error")?,
published_at: row.try_get("published_at")?,
})
}
pub async fn insert_outbox_event_tx(
tx: &mut Transaction<'_, Postgres>,
params: OutboxEventInsert,
) -> Result<Uuid, sqlx::Error> {
let available_at = params.available_at.unwrap_or_else(Utc::now);
let row = sqlx::query(
r#"
INSERT INTO public.athena_outbox
(aggregate_type, aggregate_id, event_type, payload, headers, available_at)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING event_id
"#,
)
.bind(¶ms.aggregate_type)
.bind(¶ms.aggregate_id)
.bind(¶ms.event_type)
.bind(¶ms.payload)
.bind(¶ms.headers)
.bind(available_at)
.fetch_one(&mut **tx)
.await?;
row.try_get("event_id")
}
pub async fn lease_pending_outbox_events(
pool: &PgPool,
batch_size: i32,
max_attempts: i32,
) -> Result<Vec<OutboxEvent>, sqlx::Error> {
let rows = sqlx::query(
r#"
UPDATE public.athena_outbox
SET attempts = attempts + 1
WHERE event_id IN (
SELECT event_id
FROM public.athena_outbox
WHERE published_at IS NULL
AND available_at <= now()
AND attempts < $2
ORDER BY available_at
LIMIT $1
FOR UPDATE SKIP LOCKED
)
RETURNING
event_id,
aggregate_type,
aggregate_id,
event_type,
payload,
headers,
created_at,
available_at,
attempts,
last_error,
published_at
"#,
)
.bind(batch_size)
.bind(max_attempts)
.fetch_all(pool)
.await?;
rows.iter().map(map_outbox_row).collect()
}
pub async fn mark_outbox_published(pool: &PgPool, event_id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE public.athena_outbox
SET published_at = now()
WHERE event_id = $1
AND published_at IS NULL
"#,
)
.bind(event_id)
.execute(pool)
.await?;
Ok(())
}
pub async fn mark_outbox_failed_attempt(
pool: &PgPool,
event_id: Uuid,
error: &str,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE public.athena_outbox
SET last_error = $2,
available_at = now() + (least(power(2, attempts), 600) * interval '1 second')
WHERE event_id = $1
AND published_at IS NULL
"#,
)
.bind(event_id)
.bind(error)
.execute(pool)
.await?;
Ok(())
}
pub async fn count_pending_outbox_events(
pool: &PgPool,
max_attempts: i32,
) -> Result<i64, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT COUNT(*) AS cnt
FROM public.athena_outbox
WHERE published_at IS NULL
AND attempts < $1
"#,
)
.bind(max_attempts)
.fetch_one(pool)
.await?;
row.try_get("cnt")
}
pub async fn count_poisoned_outbox_events(
pool: &PgPool,
max_attempts: i32,
) -> Result<i64, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT COUNT(*) AS cnt
FROM public.athena_outbox
WHERE published_at IS NULL
AND attempts >= $1
"#,
)
.bind(max_attempts)
.fetch_one(pool)
.await?;
row.try_get("cnt")
}