Skip to main content

cratestack_sqlx/
descriptor.rs

1use std::fmt::Write;
2use std::marker::PhantomData;
3
4use cratestack_core::{
5    CoolError, CoolEventBus, CoolEventEnvelope, CoolEventFuture, ModelEventKind,
6};
7use cratestack_policy::ReadPolicy;
8
9#[derive(Debug, Clone)]
10pub struct SqlxRuntime {
11    pool: sqlx::PgPool,
12    events: CoolEventBus,
13}
14
15impl SqlxRuntime {
16    pub fn new(pool: sqlx::PgPool) -> Self {
17        Self {
18            pool,
19            events: CoolEventBus::default(),
20        }
21    }
22
23    pub fn pool(&self) -> &sqlx::PgPool {
24        &self.pool
25    }
26
27    #[doc(hidden)]
28    pub fn subscribe<F>(&self, model: &'static str, operation: ModelEventKind, handler: F)
29    where
30        F: Fn(CoolEventEnvelope) -> CoolEventFuture + Send + Sync + 'static,
31    {
32        self.events.subscribe(model, operation, handler);
33    }
34
35    #[doc(hidden)]
36    pub async fn drain_event_outbox(&self) -> Result<usize, CoolError> {
37        ensure_event_outbox_table(&self.pool).await?;
38
39        let rows = sqlx::query_as::<_, EventOutboxRow>(
40            "SELECT event_id, model, operation, occurred_at, payload, attempts, last_error \
41             FROM cratestack_event_outbox \
42             WHERE delivered_at IS NULL \
43             ORDER BY occurred_at ASC, event_id ASC",
44        )
45        .fetch_all(&self.pool)
46        .await
47        .map_err(|error| CoolError::Database(error.to_string()))?;
48
49        let mut delivered = 0usize;
50        for row in rows {
51            let event_id = row.event_id;
52            let envelope = row.try_into_envelope()?;
53            match self.events.emit(envelope).await {
54                Ok(()) => {
55                    sqlx::query(
56                        "UPDATE cratestack_event_outbox \
57                         SET delivered_at = NOW(), last_error = NULL, attempts = attempts + 1 \
58                         WHERE event_id = $1",
59                    )
60                    .bind(event_id)
61                    .execute(&self.pool)
62                    .await
63                    .map_err(|error| CoolError::Database(error.to_string()))?;
64                    delivered += 1;
65                }
66                Err(error) => {
67                    sqlx::query(
68                        "UPDATE cratestack_event_outbox \
69                         SET attempts = attempts + 1, last_error = $2 \
70                         WHERE event_id = $1",
71                    )
72                    .bind(event_id)
73                    .bind(error.to_string())
74                    .execute(&self.pool)
75                    .await
76                    .map_err(|db_error| CoolError::Database(db_error.to_string()))?;
77                }
78            }
79        }
80
81        Ok(delivered)
82    }
83}
84
85#[derive(Debug, Clone, sqlx::FromRow)]
86pub(crate) struct EventOutboxRow {
87    pub(crate) event_id: uuid::Uuid,
88    pub(crate) model: String,
89    pub(crate) operation: String,
90    pub(crate) occurred_at: chrono::DateTime<chrono::Utc>,
91    pub(crate) payload: serde_json::Value,
92    pub(crate) attempts: i64,
93    pub(crate) last_error: Option<String>,
94}
95
96impl EventOutboxRow {
97    pub(crate) fn try_into_envelope(self) -> Result<CoolEventEnvelope, CoolError> {
98        let _ = self.attempts;
99        let _ = &self.last_error;
100        Ok(CoolEventEnvelope {
101            event_id: self.event_id,
102            model: self.model,
103            operation: ModelEventKind::parse(&self.operation)?,
104            occurred_at: self.occurred_at,
105            data: self.payload,
106        })
107    }
108}
109
110pub(crate) async fn ensure_event_outbox_table<'e, E>(executor: E) -> Result<(), CoolError>
111where
112    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
113{
114    sqlx::query(
115        "CREATE TABLE IF NOT EXISTS cratestack_event_outbox (\
116            event_id UUID PRIMARY KEY, \
117            model TEXT NOT NULL, \
118            operation TEXT NOT NULL, \
119            occurred_at TIMESTAMPTZ NOT NULL, \
120            payload JSONB NOT NULL, \
121            delivered_at TIMESTAMPTZ, \
122            attempts BIGINT NOT NULL DEFAULT 0, \
123            last_error TEXT\
124        )",
125    )
126    .execute(executor)
127    .await
128    .map_err(|error| CoolError::Database(error.to_string()))?;
129
130    Ok(())
131}
132
133pub(crate) async fn enqueue_event_outbox<'e, E, T>(
134    executor: E,
135    model: &'static str,
136    operation: ModelEventKind,
137    data: &T,
138) -> Result<(), CoolError>
139where
140    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
141    T: serde::Serialize,
142{
143    let payload = serde_json::to_value(data)
144        .map_err(|error| CoolError::Codec(format!("failed to encode event payload: {error}")))?;
145    sqlx::query(
146        "INSERT INTO cratestack_event_outbox (event_id, model, operation, occurred_at, payload) \
147         VALUES ($1, $2, $3, $4, $5)",
148    )
149    .bind(uuid::Uuid::new_v4())
150    .bind(model)
151    .bind(operation.as_str())
152    .bind(chrono::Utc::now())
153    .bind(payload)
154    .execute(executor)
155    .await
156    .map_err(|error| CoolError::Database(error.to_string()))?;
157
158    Ok(())
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162pub struct ModelColumn {
163    pub rust_name: &'static str,
164    pub sql_name: &'static str,
165}
166
167#[derive(Debug, Clone, Copy)]
168pub struct ModelDescriptor<M, PK> {
169    pub schema_name: &'static str,
170    pub table_name: &'static str,
171    pub columns: &'static [ModelColumn],
172    pub primary_key: &'static str,
173    pub allowed_fields: &'static [&'static str],
174    pub allowed_includes: &'static [&'static str],
175    pub allowed_sorts: &'static [&'static str],
176    pub read_allow_policies: &'static [ReadPolicy],
177    pub read_deny_policies: &'static [ReadPolicy],
178    pub detail_allow_policies: &'static [ReadPolicy],
179    pub detail_deny_policies: &'static [ReadPolicy],
180    pub create_allow_policies: &'static [ReadPolicy],
181    pub create_deny_policies: &'static [ReadPolicy],
182    pub update_allow_policies: &'static [ReadPolicy],
183    pub update_deny_policies: &'static [ReadPolicy],
184    pub delete_allow_policies: &'static [ReadPolicy],
185    pub delete_deny_policies: &'static [ReadPolicy],
186    pub create_defaults: &'static [CreateDefault],
187    pub emitted_events: &'static [ModelEventKind],
188    /// Column name of the optimistic-locking version field, set when the
189    /// model declares an `@version` field. `None` for non-versioned models,
190    /// which keeps update semantics unchanged.
191    pub version_column: Option<&'static str>,
192    /// `true` when the model declared `@@audit`. Mutations on audit-enabled
193    /// models capture before/after snapshots and persist them into
194    /// `cratestack_audit` inside the same transaction.
195    pub audit_enabled: bool,
196    /// SQL column names of fields declared `@pii`. The audit-log writer
197    /// replaces these values with `"[redacted-pii]"` in the persisted JSON
198    /// snapshots; a follow-up will extend the same redaction to error
199    /// detail and tracing.
200    pub pii_columns: &'static [&'static str],
201    /// SQL column names of fields declared `@sensitive`. Redacted in audit
202    /// snapshots as `"[redacted-sensitive]"`.
203    pub sensitive_columns: &'static [&'static str],
204    /// Column name for the soft-delete timestamp. When `Some`, DELETE
205    /// operations become UPDATE-of-`deleted_at` and every SELECT through
206    /// `push_scoped_conditions` filters out rows where the column is
207    /// non-null. Defaults to `Some("deleted_at")` when `@@soft_delete` is
208    /// declared.
209    pub soft_delete_column: Option<&'static str>,
210    /// Retention window in days for soft-deleted rows. The runtime does
211    /// not auto-GC; banks run their own scheduled job that deletes rows
212    /// where `deleted_at < NOW() - retention`. Surfaced here so the GC
213    /// can read the policy from one place.
214    pub retention_days: Option<u32>,
215    _marker: PhantomData<fn() -> (M, PK)>,
216}
217
218#[derive(Debug, Clone, Copy, PartialEq, Eq)]
219pub enum CreateDefaultType {
220    Bool,
221    Int,
222    String,
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
226pub struct CreateDefault {
227    pub column: &'static str,
228    pub auth_field: &'static str,
229    pub ty: CreateDefaultType,
230    pub nullable: bool,
231}
232
233impl<M, PK> ModelDescriptor<M, PK> {
234    pub const fn new(
235        schema_name: &'static str,
236        table_name: &'static str,
237        columns: &'static [ModelColumn],
238        primary_key: &'static str,
239        allowed_fields: &'static [&'static str],
240        allowed_includes: &'static [&'static str],
241        allowed_sorts: &'static [&'static str],
242        read_allow_policies: &'static [ReadPolicy],
243        read_deny_policies: &'static [ReadPolicy],
244        detail_allow_policies: &'static [ReadPolicy],
245        detail_deny_policies: &'static [ReadPolicy],
246        create_allow_policies: &'static [ReadPolicy],
247        create_deny_policies: &'static [ReadPolicy],
248        update_allow_policies: &'static [ReadPolicy],
249        update_deny_policies: &'static [ReadPolicy],
250        delete_allow_policies: &'static [ReadPolicy],
251        delete_deny_policies: &'static [ReadPolicy],
252        create_defaults: &'static [CreateDefault],
253        emitted_events: &'static [ModelEventKind],
254        version_column: Option<&'static str>,
255        audit_enabled: bool,
256        pii_columns: &'static [&'static str],
257        sensitive_columns: &'static [&'static str],
258        soft_delete_column: Option<&'static str>,
259        retention_days: Option<u32>,
260    ) -> Self {
261        Self {
262            schema_name,
263            table_name,
264            columns,
265            primary_key,
266            allowed_fields,
267            allowed_includes,
268            allowed_sorts,
269            read_allow_policies,
270            read_deny_policies,
271            detail_allow_policies,
272            detail_deny_policies,
273            create_allow_policies,
274            create_deny_policies,
275            update_allow_policies,
276            update_deny_policies,
277            delete_allow_policies,
278            delete_deny_policies,
279            create_defaults,
280            emitted_events,
281            version_column,
282            audit_enabled,
283            pii_columns,
284            sensitive_columns,
285            soft_delete_column,
286            retention_days,
287            _marker: PhantomData,
288        }
289    }
290
291    pub fn emits(&self, operation: ModelEventKind) -> bool {
292        self.emitted_events.contains(&operation)
293    }
294
295    pub fn select_projection(&self) -> String {
296        let mut sql = String::new();
297        for (index, column) in self.columns.iter().enumerate() {
298            if index > 0 {
299                sql.push_str(", ");
300            }
301            let _ = write!(sql, "{} AS \"{}\"", column.sql_name, column.rust_name);
302        }
303        sql
304    }
305}