cratestack_sqlx/
descriptor.rs1use crate::sqlx;
2
3use cratestack_core::{
4 CoolError, CoolEventBus, CoolEventEnvelope, CoolEventFuture, ModelEventKind,
5};
6
7use crate::error::cool_error_from_sqlx;
8
9#[derive(Debug, Clone)]
10pub struct SqlxRuntime {
11 pool: sqlx::PgPool,
12 events: CoolEventBus,
13}
14
15impl SqlxRuntime {
16 pub fn new(pool: sqlx::PgPool) -> Self {
17 Self {
18 pool,
19 events: CoolEventBus::default(),
20 }
21 }
22
23 pub fn pool(&self) -> &sqlx::PgPool {
24 &self.pool
25 }
26
27 #[doc(hidden)]
28 pub fn subscribe<F>(&self, model: &'static str, operation: ModelEventKind, handler: F)
29 where
30 F: Fn(CoolEventEnvelope) -> CoolEventFuture + Send + Sync + 'static,
31 {
32 self.events.subscribe(model, operation, handler);
33 }
34
35 #[doc(hidden)]
36 pub async fn drain_event_outbox(&self) -> Result<usize, CoolError> {
37 ensure_event_outbox_table(&self.pool).await?;
38
39 let rows = sqlx::query_as::<_, EventOutboxRow>(
40 "SELECT event_id, model, operation, occurred_at, payload, attempts, last_error \
41 FROM cratestack_event_outbox \
42 WHERE delivered_at IS NULL \
43 ORDER BY occurred_at ASC, event_id ASC",
44 )
45 .fetch_all(&self.pool)
46 .await
47 .map_err(cool_error_from_sqlx)?;
48
49 let mut delivered = 0usize;
50 for row in rows {
51 let event_id = row.event_id;
52 let envelope = row.try_into_envelope()?;
53 match self.events.emit(envelope).await {
54 Ok(()) => {
55 sqlx::query(
56 "UPDATE cratestack_event_outbox \
57 SET delivered_at = NOW(), last_error = NULL, attempts = attempts + 1 \
58 WHERE event_id = $1",
59 )
60 .bind(event_id)
61 .execute(&self.pool)
62 .await
63 .map_err(cool_error_from_sqlx)?;
64 delivered += 1;
65 }
66 Err(error) => {
67 sqlx::query(
68 "UPDATE cratestack_event_outbox \
69 SET attempts = attempts + 1, last_error = $2 \
70 WHERE event_id = $1",
71 )
72 .bind(event_id)
73 .bind(error.to_string())
74 .execute(&self.pool)
75 .await
76 .map_err(cool_error_from_sqlx)?;
77 }
78 }
79 }
80
81 Ok(delivered)
82 }
83}
84
85#[derive(Debug, Clone)]
86pub(crate) struct EventOutboxRow {
87 pub(crate) event_id: uuid::Uuid,
88 pub(crate) model: String,
89 pub(crate) operation: String,
90 pub(crate) occurred_at: chrono::DateTime<chrono::Utc>,
91 pub(crate) payload: serde_json::Value,
92 pub(crate) attempts: i64,
93 pub(crate) last_error: Option<String>,
94}
95
96impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for EventOutboxRow {
100 fn from_row(row: &'r sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> {
101 use sqlx::Row;
102 Ok(Self {
103 event_id: row.try_get("event_id")?,
104 model: row.try_get("model")?,
105 operation: row.try_get("operation")?,
106 occurred_at: row.try_get("occurred_at")?,
107 payload: row.try_get("payload")?,
108 attempts: row.try_get("attempts")?,
109 last_error: row.try_get("last_error")?,
110 })
111 }
112}
113
114impl EventOutboxRow {
115 pub(crate) fn try_into_envelope(self) -> Result<CoolEventEnvelope, CoolError> {
116 let _ = self.attempts;
117 let _ = &self.last_error;
118 Ok(CoolEventEnvelope {
119 event_id: self.event_id,
120 model: self.model,
121 operation: ModelEventKind::parse(&self.operation)?,
122 occurred_at: self.occurred_at,
123 data: self.payload,
124 })
125 }
126}
127
128pub(crate) async fn ensure_event_outbox_table<'e, E>(executor: E) -> Result<(), CoolError>
129where
130 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
131{
132 sqlx::query(
133 "CREATE TABLE IF NOT EXISTS cratestack_event_outbox (\
134 event_id UUID PRIMARY KEY, \
135 model TEXT NOT NULL, \
136 operation TEXT NOT NULL, \
137 occurred_at TIMESTAMPTZ NOT NULL, \
138 payload JSONB NOT NULL, \
139 delivered_at TIMESTAMPTZ, \
140 attempts BIGINT NOT NULL DEFAULT 0, \
141 last_error TEXT\
142 )",
143 )
144 .execute(executor)
145 .await
146 .map_err(cool_error_from_sqlx)?;
147
148 Ok(())
149}
150
151pub(crate) async fn enqueue_event_outbox<'e, E, T>(
152 executor: E,
153 model: &'static str,
154 operation: ModelEventKind,
155 data: &T,
156) -> Result<(), CoolError>
157where
158 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
159 T: serde::Serialize,
160{
161 let payload = serde_json::to_value(data)
162 .map_err(|error| CoolError::Codec(format!("failed to encode event payload: {error}")))?;
163 sqlx::query(
164 "INSERT INTO cratestack_event_outbox (event_id, model, operation, occurred_at, payload) \
165 VALUES ($1, $2, $3, $4, $5)",
166 )
167 .bind(uuid::Uuid::new_v4())
168 .bind(model)
169 .bind(operation.as_str())
170 .bind(chrono::Utc::now())
171 .bind(payload)
172 .execute(executor)
173 .await
174 .map_err(cool_error_from_sqlx)?;
175
176 Ok(())
177}