cratestack_sqlx/
descriptor.rs1use crate::sqlx;
2
3use cratestack_core::{
4 CoolError, CoolEventBus, CoolEventEnvelope, CoolEventFuture, ModelEventKind,
5};
6
7#[derive(Debug, Clone)]
8pub struct SqlxRuntime {
9 pool: sqlx::PgPool,
10 events: CoolEventBus,
11}
12
13impl SqlxRuntime {
14 pub fn new(pool: sqlx::PgPool) -> Self {
15 Self {
16 pool,
17 events: CoolEventBus::default(),
18 }
19 }
20
21 pub fn pool(&self) -> &sqlx::PgPool {
22 &self.pool
23 }
24
25 #[doc(hidden)]
26 pub fn subscribe<F>(&self, model: &'static str, operation: ModelEventKind, handler: F)
27 where
28 F: Fn(CoolEventEnvelope) -> CoolEventFuture + Send + Sync + 'static,
29 {
30 self.events.subscribe(model, operation, handler);
31 }
32
33 #[doc(hidden)]
34 pub async fn drain_event_outbox(&self) -> Result<usize, CoolError> {
35 ensure_event_outbox_table(&self.pool).await?;
36
37 let rows = sqlx::query_as::<_, EventOutboxRow>(
38 "SELECT event_id, model, operation, occurred_at, payload, attempts, last_error \
39 FROM cratestack_event_outbox \
40 WHERE delivered_at IS NULL \
41 ORDER BY occurred_at ASC, event_id ASC",
42 )
43 .fetch_all(&self.pool)
44 .await
45 .map_err(|error| CoolError::Database(error.to_string()))?;
46
47 let mut delivered = 0usize;
48 for row in rows {
49 let event_id = row.event_id;
50 let envelope = row.try_into_envelope()?;
51 match self.events.emit(envelope).await {
52 Ok(()) => {
53 sqlx::query(
54 "UPDATE cratestack_event_outbox \
55 SET delivered_at = NOW(), last_error = NULL, attempts = attempts + 1 \
56 WHERE event_id = $1",
57 )
58 .bind(event_id)
59 .execute(&self.pool)
60 .await
61 .map_err(|error| CoolError::Database(error.to_string()))?;
62 delivered += 1;
63 }
64 Err(error) => {
65 sqlx::query(
66 "UPDATE cratestack_event_outbox \
67 SET attempts = attempts + 1, last_error = $2 \
68 WHERE event_id = $1",
69 )
70 .bind(event_id)
71 .bind(error.to_string())
72 .execute(&self.pool)
73 .await
74 .map_err(|db_error| CoolError::Database(db_error.to_string()))?;
75 }
76 }
77 }
78
79 Ok(delivered)
80 }
81}
82
83#[derive(Debug, Clone)]
84pub(crate) struct EventOutboxRow {
85 pub(crate) event_id: uuid::Uuid,
86 pub(crate) model: String,
87 pub(crate) operation: String,
88 pub(crate) occurred_at: chrono::DateTime<chrono::Utc>,
89 pub(crate) payload: serde_json::Value,
90 pub(crate) attempts: i64,
91 pub(crate) last_error: Option<String>,
92}
93
94impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for EventOutboxRow {
98 fn from_row(row: &'r sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> {
99 use sqlx::Row;
100 Ok(Self {
101 event_id: row.try_get("event_id")?,
102 model: row.try_get("model")?,
103 operation: row.try_get("operation")?,
104 occurred_at: row.try_get("occurred_at")?,
105 payload: row.try_get("payload")?,
106 attempts: row.try_get("attempts")?,
107 last_error: row.try_get("last_error")?,
108 })
109 }
110}
111
112impl EventOutboxRow {
113 pub(crate) fn try_into_envelope(self) -> Result<CoolEventEnvelope, CoolError> {
114 let _ = self.attempts;
115 let _ = &self.last_error;
116 Ok(CoolEventEnvelope {
117 event_id: self.event_id,
118 model: self.model,
119 operation: ModelEventKind::parse(&self.operation)?,
120 occurred_at: self.occurred_at,
121 data: self.payload,
122 })
123 }
124}
125
126pub(crate) async fn ensure_event_outbox_table<'e, E>(executor: E) -> Result<(), CoolError>
127where
128 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
129{
130 sqlx::query(
131 "CREATE TABLE IF NOT EXISTS cratestack_event_outbox (\
132 event_id UUID PRIMARY KEY, \
133 model TEXT NOT NULL, \
134 operation TEXT NOT NULL, \
135 occurred_at TIMESTAMPTZ NOT NULL, \
136 payload JSONB NOT NULL, \
137 delivered_at TIMESTAMPTZ, \
138 attempts BIGINT NOT NULL DEFAULT 0, \
139 last_error TEXT\
140 )",
141 )
142 .execute(executor)
143 .await
144 .map_err(|error| CoolError::Database(error.to_string()))?;
145
146 Ok(())
147}
148
149pub(crate) async fn enqueue_event_outbox<'e, E, T>(
150 executor: E,
151 model: &'static str,
152 operation: ModelEventKind,
153 data: &T,
154) -> Result<(), CoolError>
155where
156 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
157 T: serde::Serialize,
158{
159 let payload = serde_json::to_value(data)
160 .map_err(|error| CoolError::Codec(format!("failed to encode event payload: {error}")))?;
161 sqlx::query(
162 "INSERT INTO cratestack_event_outbox (event_id, model, operation, occurred_at, payload) \
163 VALUES ($1, $2, $3, $4, $5)",
164 )
165 .bind(uuid::Uuid::new_v4())
166 .bind(model)
167 .bind(operation.as_str())
168 .bind(chrono::Utc::now())
169 .bind(payload)
170 .execute(executor)
171 .await
172 .map_err(|error| CoolError::Database(error.to_string()))?;
173
174 Ok(())
175}