Skip to main content

cratestack_sqlx/
descriptor.rs

1use crate::sqlx;
2
3use cratestack_core::{
4    CoolError, CoolEventBus, CoolEventEnvelope, CoolEventFuture, ModelEventKind,
5};
6
7#[derive(Debug, Clone)]
8pub struct SqlxRuntime {
9    pool: sqlx::PgPool,
10    events: CoolEventBus,
11}
12
13impl SqlxRuntime {
14    pub fn new(pool: sqlx::PgPool) -> Self {
15        Self {
16            pool,
17            events: CoolEventBus::default(),
18        }
19    }
20
21    pub fn pool(&self) -> &sqlx::PgPool {
22        &self.pool
23    }
24
25    #[doc(hidden)]
26    pub fn subscribe<F>(&self, model: &'static str, operation: ModelEventKind, handler: F)
27    where
28        F: Fn(CoolEventEnvelope) -> CoolEventFuture + Send + Sync + 'static,
29    {
30        self.events.subscribe(model, operation, handler);
31    }
32
33    #[doc(hidden)]
34    pub async fn drain_event_outbox(&self) -> Result<usize, CoolError> {
35        ensure_event_outbox_table(&self.pool).await?;
36
37        let rows = sqlx::query_as::<_, EventOutboxRow>(
38            "SELECT event_id, model, operation, occurred_at, payload, attempts, last_error \
39             FROM cratestack_event_outbox \
40             WHERE delivered_at IS NULL \
41             ORDER BY occurred_at ASC, event_id ASC",
42        )
43        .fetch_all(&self.pool)
44        .await
45        .map_err(|error| CoolError::Database(error.to_string()))?;
46
47        let mut delivered = 0usize;
48        for row in rows {
49            let event_id = row.event_id;
50            let envelope = row.try_into_envelope()?;
51            match self.events.emit(envelope).await {
52                Ok(()) => {
53                    sqlx::query(
54                        "UPDATE cratestack_event_outbox \
55                         SET delivered_at = NOW(), last_error = NULL, attempts = attempts + 1 \
56                         WHERE event_id = $1",
57                    )
58                    .bind(event_id)
59                    .execute(&self.pool)
60                    .await
61                    .map_err(|error| CoolError::Database(error.to_string()))?;
62                    delivered += 1;
63                }
64                Err(error) => {
65                    sqlx::query(
66                        "UPDATE cratestack_event_outbox \
67                         SET attempts = attempts + 1, last_error = $2 \
68                         WHERE event_id = $1",
69                    )
70                    .bind(event_id)
71                    .bind(error.to_string())
72                    .execute(&self.pool)
73                    .await
74                    .map_err(|db_error| CoolError::Database(db_error.to_string()))?;
75                }
76            }
77        }
78
79        Ok(delivered)
80    }
81}
82
83#[derive(Debug, Clone)]
84pub(crate) struct EventOutboxRow {
85    pub(crate) event_id: uuid::Uuid,
86    pub(crate) model: String,
87    pub(crate) operation: String,
88    pub(crate) occurred_at: chrono::DateTime<chrono::Utc>,
89    pub(crate) payload: serde_json::Value,
90    pub(crate) attempts: i64,
91    pub(crate) last_error: Option<String>,
92}
93
94// Hand-written `FromRow` impl. We can't use `#[derive(sqlx::FromRow)]` because
95// the derive macro hardcodes `::sqlx::*` paths that don't resolve through our
96// `crate::sqlx` shim (the shim is module-scoped, not crate-aliased).
97impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for EventOutboxRow {
98    fn from_row(row: &'r sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> {
99        use sqlx::Row;
100        Ok(Self {
101            event_id: row.try_get("event_id")?,
102            model: row.try_get("model")?,
103            operation: row.try_get("operation")?,
104            occurred_at: row.try_get("occurred_at")?,
105            payload: row.try_get("payload")?,
106            attempts: row.try_get("attempts")?,
107            last_error: row.try_get("last_error")?,
108        })
109    }
110}
111
112impl EventOutboxRow {
113    pub(crate) fn try_into_envelope(self) -> Result<CoolEventEnvelope, CoolError> {
114        let _ = self.attempts;
115        let _ = &self.last_error;
116        Ok(CoolEventEnvelope {
117            event_id: self.event_id,
118            model: self.model,
119            operation: ModelEventKind::parse(&self.operation)?,
120            occurred_at: self.occurred_at,
121            data: self.payload,
122        })
123    }
124}
125
126pub(crate) async fn ensure_event_outbox_table<'e, E>(executor: E) -> Result<(), CoolError>
127where
128    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
129{
130    sqlx::query(
131        "CREATE TABLE IF NOT EXISTS cratestack_event_outbox (\
132            event_id UUID PRIMARY KEY, \
133            model TEXT NOT NULL, \
134            operation TEXT NOT NULL, \
135            occurred_at TIMESTAMPTZ NOT NULL, \
136            payload JSONB NOT NULL, \
137            delivered_at TIMESTAMPTZ, \
138            attempts BIGINT NOT NULL DEFAULT 0, \
139            last_error TEXT\
140        )",
141    )
142    .execute(executor)
143    .await
144    .map_err(|error| CoolError::Database(error.to_string()))?;
145
146    Ok(())
147}
148
149pub(crate) async fn enqueue_event_outbox<'e, E, T>(
150    executor: E,
151    model: &'static str,
152    operation: ModelEventKind,
153    data: &T,
154) -> Result<(), CoolError>
155where
156    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
157    T: serde::Serialize,
158{
159    let payload = serde_json::to_value(data)
160        .map_err(|error| CoolError::Codec(format!("failed to encode event payload: {error}")))?;
161    sqlx::query(
162        "INSERT INTO cratestack_event_outbox (event_id, model, operation, occurred_at, payload) \
163         VALUES ($1, $2, $3, $4, $5)",
164    )
165    .bind(uuid::Uuid::new_v4())
166    .bind(model)
167    .bind(operation.as_str())
168    .bind(chrono::Utc::now())
169    .bind(payload)
170    .execute(executor)
171    .await
172    .map_err(|error| CoolError::Database(error.to_string()))?;
173
174    Ok(())
175}