1use std::fmt::Write;
2use std::marker::PhantomData;
3
4use cratestack_core::{
5 CoolError, CoolEventBus, CoolEventEnvelope, CoolEventFuture, ModelEventKind,
6};
7use cratestack_policy::ReadPolicy;
8
9#[derive(Debug, Clone)]
10pub struct SqlxRuntime {
11 pool: sqlx::PgPool,
12 events: CoolEventBus,
13}
14
15impl SqlxRuntime {
16 pub fn new(pool: sqlx::PgPool) -> Self {
17 Self {
18 pool,
19 events: CoolEventBus::default(),
20 }
21 }
22
23 pub fn pool(&self) -> &sqlx::PgPool {
24 &self.pool
25 }
26
27 #[doc(hidden)]
28 pub fn subscribe<F>(&self, model: &'static str, operation: ModelEventKind, handler: F)
29 where
30 F: Fn(CoolEventEnvelope) -> CoolEventFuture + Send + Sync + 'static,
31 {
32 self.events.subscribe(model, operation, handler);
33 }
34
35 #[doc(hidden)]
36 pub async fn drain_event_outbox(&self) -> Result<usize, CoolError> {
37 ensure_event_outbox_table(&self.pool).await?;
38
39 let rows = sqlx::query_as::<_, EventOutboxRow>(
40 "SELECT event_id, model, operation, occurred_at, payload, attempts, last_error \
41 FROM cratestack_event_outbox \
42 WHERE delivered_at IS NULL \
43 ORDER BY occurred_at ASC, event_id ASC",
44 )
45 .fetch_all(&self.pool)
46 .await
47 .map_err(|error| CoolError::Database(error.to_string()))?;
48
49 let mut delivered = 0usize;
50 for row in rows {
51 let event_id = row.event_id;
52 let envelope = row.try_into_envelope()?;
53 match self.events.emit(envelope).await {
54 Ok(()) => {
55 sqlx::query(
56 "UPDATE cratestack_event_outbox \
57 SET delivered_at = NOW(), last_error = NULL, attempts = attempts + 1 \
58 WHERE event_id = $1",
59 )
60 .bind(event_id)
61 .execute(&self.pool)
62 .await
63 .map_err(|error| CoolError::Database(error.to_string()))?;
64 delivered += 1;
65 }
66 Err(error) => {
67 sqlx::query(
68 "UPDATE cratestack_event_outbox \
69 SET attempts = attempts + 1, last_error = $2 \
70 WHERE event_id = $1",
71 )
72 .bind(event_id)
73 .bind(error.to_string())
74 .execute(&self.pool)
75 .await
76 .map_err(|db_error| CoolError::Database(db_error.to_string()))?;
77 }
78 }
79 }
80
81 Ok(delivered)
82 }
83}
84
85#[derive(Debug, Clone, sqlx::FromRow)]
86pub(crate) struct EventOutboxRow {
87 pub(crate) event_id: uuid::Uuid,
88 pub(crate) model: String,
89 pub(crate) operation: String,
90 pub(crate) occurred_at: chrono::DateTime<chrono::Utc>,
91 pub(crate) payload: serde_json::Value,
92 pub(crate) attempts: i64,
93 pub(crate) last_error: Option<String>,
94}
95
96impl EventOutboxRow {
97 pub(crate) fn try_into_envelope(self) -> Result<CoolEventEnvelope, CoolError> {
98 let _ = self.attempts;
99 let _ = &self.last_error;
100 Ok(CoolEventEnvelope {
101 event_id: self.event_id,
102 model: self.model,
103 operation: ModelEventKind::parse(&self.operation)?,
104 occurred_at: self.occurred_at,
105 data: self.payload,
106 })
107 }
108}
109
110pub(crate) async fn ensure_event_outbox_table<'e, E>(executor: E) -> Result<(), CoolError>
111where
112 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
113{
114 sqlx::query(
115 "CREATE TABLE IF NOT EXISTS cratestack_event_outbox (\
116 event_id UUID PRIMARY KEY, \
117 model TEXT NOT NULL, \
118 operation TEXT NOT NULL, \
119 occurred_at TIMESTAMPTZ NOT NULL, \
120 payload JSONB NOT NULL, \
121 delivered_at TIMESTAMPTZ, \
122 attempts BIGINT NOT NULL DEFAULT 0, \
123 last_error TEXT\
124 )",
125 )
126 .execute(executor)
127 .await
128 .map_err(|error| CoolError::Database(error.to_string()))?;
129
130 Ok(())
131}
132
133pub(crate) async fn enqueue_event_outbox<'e, E, T>(
134 executor: E,
135 model: &'static str,
136 operation: ModelEventKind,
137 data: &T,
138) -> Result<(), CoolError>
139where
140 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
141 T: serde::Serialize,
142{
143 let payload = serde_json::to_value(data)
144 .map_err(|error| CoolError::Codec(format!("failed to encode event payload: {error}")))?;
145 sqlx::query(
146 "INSERT INTO cratestack_event_outbox (event_id, model, operation, occurred_at, payload) \
147 VALUES ($1, $2, $3, $4, $5)",
148 )
149 .bind(uuid::Uuid::new_v4())
150 .bind(model)
151 .bind(operation.as_str())
152 .bind(chrono::Utc::now())
153 .bind(payload)
154 .execute(executor)
155 .await
156 .map_err(|error| CoolError::Database(error.to_string()))?;
157
158 Ok(())
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162pub struct ModelColumn {
163 pub rust_name: &'static str,
164 pub sql_name: &'static str,
165}
166
167#[derive(Debug, Clone, Copy)]
168pub struct ModelDescriptor<M, PK> {
169 pub schema_name: &'static str,
170 pub table_name: &'static str,
171 pub columns: &'static [ModelColumn],
172 pub primary_key: &'static str,
173 pub allowed_fields: &'static [&'static str],
174 pub allowed_includes: &'static [&'static str],
175 pub allowed_sorts: &'static [&'static str],
176 pub read_allow_policies: &'static [ReadPolicy],
177 pub read_deny_policies: &'static [ReadPolicy],
178 pub detail_allow_policies: &'static [ReadPolicy],
179 pub detail_deny_policies: &'static [ReadPolicy],
180 pub create_allow_policies: &'static [ReadPolicy],
181 pub create_deny_policies: &'static [ReadPolicy],
182 pub update_allow_policies: &'static [ReadPolicy],
183 pub update_deny_policies: &'static [ReadPolicy],
184 pub delete_allow_policies: &'static [ReadPolicy],
185 pub delete_deny_policies: &'static [ReadPolicy],
186 pub create_defaults: &'static [CreateDefault],
187 pub emitted_events: &'static [ModelEventKind],
188 pub version_column: Option<&'static str>,
192 pub audit_enabled: bool,
196 pub pii_columns: &'static [&'static str],
201 pub sensitive_columns: &'static [&'static str],
204 pub soft_delete_column: Option<&'static str>,
210 pub retention_days: Option<u32>,
215 _marker: PhantomData<fn() -> (M, PK)>,
216}
217
218#[derive(Debug, Clone, Copy, PartialEq, Eq)]
219pub enum CreateDefaultType {
220 Bool,
221 Int,
222 String,
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub struct CreateDefault {
227 pub column: &'static str,
228 pub auth_field: &'static str,
229 pub ty: CreateDefaultType,
230 pub nullable: bool,
231}
232
233impl<M, PK> ModelDescriptor<M, PK> {
234 pub const fn new(
235 schema_name: &'static str,
236 table_name: &'static str,
237 columns: &'static [ModelColumn],
238 primary_key: &'static str,
239 allowed_fields: &'static [&'static str],
240 allowed_includes: &'static [&'static str],
241 allowed_sorts: &'static [&'static str],
242 read_allow_policies: &'static [ReadPolicy],
243 read_deny_policies: &'static [ReadPolicy],
244 detail_allow_policies: &'static [ReadPolicy],
245 detail_deny_policies: &'static [ReadPolicy],
246 create_allow_policies: &'static [ReadPolicy],
247 create_deny_policies: &'static [ReadPolicy],
248 update_allow_policies: &'static [ReadPolicy],
249 update_deny_policies: &'static [ReadPolicy],
250 delete_allow_policies: &'static [ReadPolicy],
251 delete_deny_policies: &'static [ReadPolicy],
252 create_defaults: &'static [CreateDefault],
253 emitted_events: &'static [ModelEventKind],
254 version_column: Option<&'static str>,
255 audit_enabled: bool,
256 pii_columns: &'static [&'static str],
257 sensitive_columns: &'static [&'static str],
258 soft_delete_column: Option<&'static str>,
259 retention_days: Option<u32>,
260 ) -> Self {
261 Self {
262 schema_name,
263 table_name,
264 columns,
265 primary_key,
266 allowed_fields,
267 allowed_includes,
268 allowed_sorts,
269 read_allow_policies,
270 read_deny_policies,
271 detail_allow_policies,
272 detail_deny_policies,
273 create_allow_policies,
274 create_deny_policies,
275 update_allow_policies,
276 update_deny_policies,
277 delete_allow_policies,
278 delete_deny_policies,
279 create_defaults,
280 emitted_events,
281 version_column,
282 audit_enabled,
283 pii_columns,
284 sensitive_columns,
285 soft_delete_column,
286 retention_days,
287 _marker: PhantomData,
288 }
289 }
290
291 pub fn emits(&self, operation: ModelEventKind) -> bool {
292 self.emitted_events.contains(&operation)
293 }
294
295 pub fn select_projection(&self) -> String {
296 let mut sql = String::new();
297 for (index, column) in self.columns.iter().enumerate() {
298 if index > 0 {
299 sql.push_str(", ");
300 }
301 let _ = write!(sql, "{} AS \"{}\"", column.sql_name, column.rust_name);
302 }
303 sql
304 }
305}