Skip to main content

cratestack_sqlx/
descriptor.rs

1use crate::sqlx;
2
3use cratestack_core::{
4    CoolError, CoolEventBus, CoolEventEnvelope, CoolEventFuture, ModelEventKind,
5};
6
7use crate::error::cool_error_from_sqlx;
8
9#[derive(Debug, Clone)]
10pub struct SqlxRuntime {
11    pool: sqlx::PgPool,
12    events: CoolEventBus,
13}
14
15impl SqlxRuntime {
16    pub fn new(pool: sqlx::PgPool) -> Self {
17        Self {
18            pool,
19            events: CoolEventBus::default(),
20        }
21    }
22
23    pub fn pool(&self) -> &sqlx::PgPool {
24        &self.pool
25    }
26
27    #[doc(hidden)]
28    pub fn subscribe<F>(&self, model: &'static str, operation: ModelEventKind, handler: F)
29    where
30        F: Fn(CoolEventEnvelope) -> CoolEventFuture + Send + Sync + 'static,
31    {
32        self.events.subscribe(model, operation, handler);
33    }
34
35    #[doc(hidden)]
36    pub async fn drain_event_outbox(&self) -> Result<usize, CoolError> {
37        ensure_event_outbox_table(&self.pool).await?;
38
39        let rows = sqlx::query_as::<_, EventOutboxRow>(
40            "SELECT event_id, model, operation, occurred_at, payload, attempts, last_error \
41             FROM cratestack_event_outbox \
42             WHERE delivered_at IS NULL \
43             ORDER BY occurred_at ASC, event_id ASC",
44        )
45        .fetch_all(&self.pool)
46        .await
47        .map_err(cool_error_from_sqlx)?;
48
49        let mut delivered = 0usize;
50        for row in rows {
51            let event_id = row.event_id;
52            let envelope = row.try_into_envelope()?;
53            match self.events.emit(envelope).await {
54                Ok(()) => {
55                    sqlx::query(
56                        "UPDATE cratestack_event_outbox \
57                         SET delivered_at = NOW(), last_error = NULL, attempts = attempts + 1 \
58                         WHERE event_id = $1",
59                    )
60                    .bind(event_id)
61                    .execute(&self.pool)
62                    .await
63                    .map_err(cool_error_from_sqlx)?;
64                    delivered += 1;
65                }
66                Err(error) => {
67                    sqlx::query(
68                        "UPDATE cratestack_event_outbox \
69                         SET attempts = attempts + 1, last_error = $2 \
70                         WHERE event_id = $1",
71                    )
72                    .bind(event_id)
73                    .bind(error.to_string())
74                    .execute(&self.pool)
75                    .await
76                    .map_err(cool_error_from_sqlx)?;
77                }
78            }
79        }
80
81        Ok(delivered)
82    }
83}
84
85#[derive(Debug, Clone)]
86pub(crate) struct EventOutboxRow {
87    pub(crate) event_id: uuid::Uuid,
88    pub(crate) model: String,
89    pub(crate) operation: String,
90    pub(crate) occurred_at: chrono::DateTime<chrono::Utc>,
91    pub(crate) payload: serde_json::Value,
92    pub(crate) attempts: i64,
93    pub(crate) last_error: Option<String>,
94}
95
96// Hand-written `FromRow` impl. We can't use `#[derive(sqlx::FromRow)]` because
97// the derive macro hardcodes `::sqlx::*` paths that don't resolve through our
98// `crate::sqlx` shim (the shim is module-scoped, not crate-aliased).
99impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for EventOutboxRow {
100    fn from_row(row: &'r sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> {
101        use sqlx::Row;
102        Ok(Self {
103            event_id: row.try_get("event_id")?,
104            model: row.try_get("model")?,
105            operation: row.try_get("operation")?,
106            occurred_at: row.try_get("occurred_at")?,
107            payload: row.try_get("payload")?,
108            attempts: row.try_get("attempts")?,
109            last_error: row.try_get("last_error")?,
110        })
111    }
112}
113
114impl EventOutboxRow {
115    pub(crate) fn try_into_envelope(self) -> Result<CoolEventEnvelope, CoolError> {
116        let _ = self.attempts;
117        let _ = &self.last_error;
118        Ok(CoolEventEnvelope {
119            event_id: self.event_id,
120            model: self.model,
121            operation: ModelEventKind::parse(&self.operation)?,
122            occurred_at: self.occurred_at,
123            data: self.payload,
124        })
125    }
126}
127
128pub(crate) async fn ensure_event_outbox_table<'e, E>(executor: E) -> Result<(), CoolError>
129where
130    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
131{
132    sqlx::query(
133        "CREATE TABLE IF NOT EXISTS cratestack_event_outbox (\
134            event_id UUID PRIMARY KEY, \
135            model TEXT NOT NULL, \
136            operation TEXT NOT NULL, \
137            occurred_at TIMESTAMPTZ NOT NULL, \
138            payload JSONB NOT NULL, \
139            delivered_at TIMESTAMPTZ, \
140            attempts BIGINT NOT NULL DEFAULT 0, \
141            last_error TEXT\
142        )",
143    )
144    .execute(executor)
145    .await
146    .map_err(cool_error_from_sqlx)?;
147
148    Ok(())
149}
150
151pub(crate) async fn enqueue_event_outbox<'e, E, T>(
152    executor: E,
153    model: &'static str,
154    operation: ModelEventKind,
155    data: &T,
156) -> Result<(), CoolError>
157where
158    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
159    T: serde::Serialize,
160{
161    let payload = serde_json::to_value(data)
162        .map_err(|error| CoolError::Codec(format!("failed to encode event payload: {error}")))?;
163    sqlx::query(
164        "INSERT INTO cratestack_event_outbox (event_id, model, operation, occurred_at, payload) \
165         VALUES ($1, $2, $3, $4, $5)",
166    )
167    .bind(uuid::Uuid::new_v4())
168    .bind(model)
169    .bind(operation.as_str())
170    .bind(chrono::Utc::now())
171    .bind(payload)
172    .execute(executor)
173    .await
174    .map_err(cool_error_from_sqlx)?;
175
176    Ok(())
177}