Skip to main content

cratestack_sqlx/
descriptor.rs

1use cratestack_core::{
2    CoolError, CoolEventBus, CoolEventEnvelope, CoolEventFuture, ModelEventKind,
3};
4
5#[derive(Debug, Clone)]
6pub struct SqlxRuntime {
7    pool: sqlx::PgPool,
8    events: CoolEventBus,
9}
10
11impl SqlxRuntime {
12    pub fn new(pool: sqlx::PgPool) -> Self {
13        Self {
14            pool,
15            events: CoolEventBus::default(),
16        }
17    }
18
19    pub fn pool(&self) -> &sqlx::PgPool {
20        &self.pool
21    }
22
23    #[doc(hidden)]
24    pub fn subscribe<F>(&self, model: &'static str, operation: ModelEventKind, handler: F)
25    where
26        F: Fn(CoolEventEnvelope) -> CoolEventFuture + Send + Sync + 'static,
27    {
28        self.events.subscribe(model, operation, handler);
29    }
30
31    #[doc(hidden)]
32    pub async fn drain_event_outbox(&self) -> Result<usize, CoolError> {
33        ensure_event_outbox_table(&self.pool).await?;
34
35        let rows = sqlx::query_as::<_, EventOutboxRow>(
36            "SELECT event_id, model, operation, occurred_at, payload, attempts, last_error \
37             FROM cratestack_event_outbox \
38             WHERE delivered_at IS NULL \
39             ORDER BY occurred_at ASC, event_id ASC",
40        )
41        .fetch_all(&self.pool)
42        .await
43        .map_err(|error| CoolError::Database(error.to_string()))?;
44
45        let mut delivered = 0usize;
46        for row in rows {
47            let event_id = row.event_id;
48            let envelope = row.try_into_envelope()?;
49            match self.events.emit(envelope).await {
50                Ok(()) => {
51                    sqlx::query(
52                        "UPDATE cratestack_event_outbox \
53                         SET delivered_at = NOW(), last_error = NULL, attempts = attempts + 1 \
54                         WHERE event_id = $1",
55                    )
56                    .bind(event_id)
57                    .execute(&self.pool)
58                    .await
59                    .map_err(|error| CoolError::Database(error.to_string()))?;
60                    delivered += 1;
61                }
62                Err(error) => {
63                    sqlx::query(
64                        "UPDATE cratestack_event_outbox \
65                         SET attempts = attempts + 1, last_error = $2 \
66                         WHERE event_id = $1",
67                    )
68                    .bind(event_id)
69                    .bind(error.to_string())
70                    .execute(&self.pool)
71                    .await
72                    .map_err(|db_error| CoolError::Database(db_error.to_string()))?;
73                }
74            }
75        }
76
77        Ok(delivered)
78    }
79}
80
81#[derive(Debug, Clone, sqlx::FromRow)]
82pub(crate) struct EventOutboxRow {
83    pub(crate) event_id: uuid::Uuid,
84    pub(crate) model: String,
85    pub(crate) operation: String,
86    pub(crate) occurred_at: chrono::DateTime<chrono::Utc>,
87    pub(crate) payload: serde_json::Value,
88    pub(crate) attempts: i64,
89    pub(crate) last_error: Option<String>,
90}
91
92impl EventOutboxRow {
93    pub(crate) fn try_into_envelope(self) -> Result<CoolEventEnvelope, CoolError> {
94        let _ = self.attempts;
95        let _ = &self.last_error;
96        Ok(CoolEventEnvelope {
97            event_id: self.event_id,
98            model: self.model,
99            operation: ModelEventKind::parse(&self.operation)?,
100            occurred_at: self.occurred_at,
101            data: self.payload,
102        })
103    }
104}
105
106pub(crate) async fn ensure_event_outbox_table<'e, E>(executor: E) -> Result<(), CoolError>
107where
108    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
109{
110    sqlx::query(
111        "CREATE TABLE IF NOT EXISTS cratestack_event_outbox (\
112            event_id UUID PRIMARY KEY, \
113            model TEXT NOT NULL, \
114            operation TEXT NOT NULL, \
115            occurred_at TIMESTAMPTZ NOT NULL, \
116            payload JSONB NOT NULL, \
117            delivered_at TIMESTAMPTZ, \
118            attempts BIGINT NOT NULL DEFAULT 0, \
119            last_error TEXT\
120        )",
121    )
122    .execute(executor)
123    .await
124    .map_err(|error| CoolError::Database(error.to_string()))?;
125
126    Ok(())
127}
128
129pub(crate) async fn enqueue_event_outbox<'e, E, T>(
130    executor: E,
131    model: &'static str,
132    operation: ModelEventKind,
133    data: &T,
134) -> Result<(), CoolError>
135where
136    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
137    T: serde::Serialize,
138{
139    let payload = serde_json::to_value(data)
140        .map_err(|error| CoolError::Codec(format!("failed to encode event payload: {error}")))?;
141    sqlx::query(
142        "INSERT INTO cratestack_event_outbox (event_id, model, operation, occurred_at, payload) \
143         VALUES ($1, $2, $3, $4, $5)",
144    )
145    .bind(uuid::Uuid::new_v4())
146    .bind(model)
147    .bind(operation.as_str())
148    .bind(chrono::Utc::now())
149    .bind(payload)
150    .execute(executor)
151    .await
152    .map_err(|error| CoolError::Database(error.to_string()))?;
153
154    Ok(())
155}