cratestack_sqlx/
descriptor.rs1use cratestack_core::{
2 CoolError, CoolEventBus, CoolEventEnvelope, CoolEventFuture, ModelEventKind,
3};
4
5#[derive(Debug, Clone)]
6pub struct SqlxRuntime {
7 pool: sqlx::PgPool,
8 events: CoolEventBus,
9}
10
11impl SqlxRuntime {
12 pub fn new(pool: sqlx::PgPool) -> Self {
13 Self {
14 pool,
15 events: CoolEventBus::default(),
16 }
17 }
18
19 pub fn pool(&self) -> &sqlx::PgPool {
20 &self.pool
21 }
22
23 #[doc(hidden)]
24 pub fn subscribe<F>(&self, model: &'static str, operation: ModelEventKind, handler: F)
25 where
26 F: Fn(CoolEventEnvelope) -> CoolEventFuture + Send + Sync + 'static,
27 {
28 self.events.subscribe(model, operation, handler);
29 }
30
31 #[doc(hidden)]
32 pub async fn drain_event_outbox(&self) -> Result<usize, CoolError> {
33 ensure_event_outbox_table(&self.pool).await?;
34
35 let rows = sqlx::query_as::<_, EventOutboxRow>(
36 "SELECT event_id, model, operation, occurred_at, payload, attempts, last_error \
37 FROM cratestack_event_outbox \
38 WHERE delivered_at IS NULL \
39 ORDER BY occurred_at ASC, event_id ASC",
40 )
41 .fetch_all(&self.pool)
42 .await
43 .map_err(|error| CoolError::Database(error.to_string()))?;
44
45 let mut delivered = 0usize;
46 for row in rows {
47 let event_id = row.event_id;
48 let envelope = row.try_into_envelope()?;
49 match self.events.emit(envelope).await {
50 Ok(()) => {
51 sqlx::query(
52 "UPDATE cratestack_event_outbox \
53 SET delivered_at = NOW(), last_error = NULL, attempts = attempts + 1 \
54 WHERE event_id = $1",
55 )
56 .bind(event_id)
57 .execute(&self.pool)
58 .await
59 .map_err(|error| CoolError::Database(error.to_string()))?;
60 delivered += 1;
61 }
62 Err(error) => {
63 sqlx::query(
64 "UPDATE cratestack_event_outbox \
65 SET attempts = attempts + 1, last_error = $2 \
66 WHERE event_id = $1",
67 )
68 .bind(event_id)
69 .bind(error.to_string())
70 .execute(&self.pool)
71 .await
72 .map_err(|db_error| CoolError::Database(db_error.to_string()))?;
73 }
74 }
75 }
76
77 Ok(delivered)
78 }
79}
80
81#[derive(Debug, Clone, sqlx::FromRow)]
82pub(crate) struct EventOutboxRow {
83 pub(crate) event_id: uuid::Uuid,
84 pub(crate) model: String,
85 pub(crate) operation: String,
86 pub(crate) occurred_at: chrono::DateTime<chrono::Utc>,
87 pub(crate) payload: serde_json::Value,
88 pub(crate) attempts: i64,
89 pub(crate) last_error: Option<String>,
90}
91
92impl EventOutboxRow {
93 pub(crate) fn try_into_envelope(self) -> Result<CoolEventEnvelope, CoolError> {
94 let _ = self.attempts;
95 let _ = &self.last_error;
96 Ok(CoolEventEnvelope {
97 event_id: self.event_id,
98 model: self.model,
99 operation: ModelEventKind::parse(&self.operation)?,
100 occurred_at: self.occurred_at,
101 data: self.payload,
102 })
103 }
104}
105
106pub(crate) async fn ensure_event_outbox_table<'e, E>(executor: E) -> Result<(), CoolError>
107where
108 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
109{
110 sqlx::query(
111 "CREATE TABLE IF NOT EXISTS cratestack_event_outbox (\
112 event_id UUID PRIMARY KEY, \
113 model TEXT NOT NULL, \
114 operation TEXT NOT NULL, \
115 occurred_at TIMESTAMPTZ NOT NULL, \
116 payload JSONB NOT NULL, \
117 delivered_at TIMESTAMPTZ, \
118 attempts BIGINT NOT NULL DEFAULT 0, \
119 last_error TEXT\
120 )",
121 )
122 .execute(executor)
123 .await
124 .map_err(|error| CoolError::Database(error.to_string()))?;
125
126 Ok(())
127}
128
129pub(crate) async fn enqueue_event_outbox<'e, E, T>(
130 executor: E,
131 model: &'static str,
132 operation: ModelEventKind,
133 data: &T,
134) -> Result<(), CoolError>
135where
136 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
137 T: serde::Serialize,
138{
139 let payload = serde_json::to_value(data)
140 .map_err(|error| CoolError::Codec(format!("failed to encode event payload: {error}")))?;
141 sqlx::query(
142 "INSERT INTO cratestack_event_outbox (event_id, model, operation, occurred_at, payload) \
143 VALUES ($1, $2, $3, $4, $5)",
144 )
145 .bind(uuid::Uuid::new_v4())
146 .bind(model)
147 .bind(operation.as_str())
148 .bind(chrono::Utc::now())
149 .bind(payload)
150 .execute(executor)
151 .await
152 .map_err(|error| CoolError::Database(error.to_string()))?;
153
154 Ok(())
155}