Skip to main content

cratestack_sqlx/query/write/
delete.rs

1//! `DeleteRecord` — single-row DELETE (soft or hard) with policy +
2//! audit + event fan-out. The RETURNING row doubles as the audit
3//! "before" snapshot for hard deletes.
4
5use cratestack_core::{AuditOperation, CoolContext, CoolError, ModelEventKind};
6
7use crate::audit::{build_audit_event, enqueue_audit_event, ensure_audit_table};
8use crate::descriptor::{enqueue_event_outbox, ensure_event_outbox_table};
9use crate::{ModelDescriptor, SqlxRuntime, sqlx};
10
11use super::delete_exec::delete_returning_record;
12
13#[derive(Debug, Clone)]
14pub struct DeleteRecord<'a, M: 'static, PK: 'static> {
15    pub(crate) runtime: &'a SqlxRuntime,
16    pub(crate) descriptor: &'static ModelDescriptor<M, PK>,
17    pub(crate) id: PK,
18}
19
20impl<'a, M: 'static, PK: 'static> DeleteRecord<'a, M, PK> {
21    pub fn preview_sql(&self) -> String {
22        format!(
23            "DELETE FROM {} WHERE {} = $1 RETURNING {}",
24            self.descriptor.table_name,
25            self.descriptor.primary_key,
26            self.descriptor.select_projection(),
27        )
28    }
29
30    /// Like [`Self::run`] but participates in a caller-supplied transaction.
31    pub async fn run_in_tx<'tx>(
32        self,
33        tx: &mut sqlx::Transaction<'tx, sqlx::Postgres>,
34        ctx: &CoolContext,
35    ) -> Result<M, CoolError>
36    where
37        for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
38        PK: Send + sqlx::Type<sqlx::Postgres> + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
39    {
40        let emits_event = self.descriptor.emits(ModelEventKind::Deleted);
41        let audit_enabled = self.descriptor.audit_enabled;
42        if emits_event {
43            ensure_event_outbox_table(&mut **tx).await?;
44        }
45        if audit_enabled {
46            ensure_audit_table(self.runtime.pool()).await?;
47        }
48        let record = delete_returning_record(&mut **tx, self.descriptor, self.id, ctx).await?;
49        if emits_event {
50            enqueue_event_outbox(
51                &mut **tx,
52                self.descriptor.schema_name,
53                ModelEventKind::Deleted,
54                &record,
55            )
56            .await?;
57        }
58        if audit_enabled {
59            let before = serde_json::to_value(&record).ok();
60            let event =
61                build_audit_event(self.descriptor, AuditOperation::Delete, before, None, ctx);
62            enqueue_audit_event(&mut **tx, &event).await?;
63        }
64        Ok(record)
65    }
66
67    pub async fn run(self, ctx: &CoolContext) -> Result<M, CoolError>
68    where
69        for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
70        PK: Send + sqlx::Type<sqlx::Postgres> + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
71    {
72        let emits_event = self.descriptor.emits(ModelEventKind::Deleted);
73        let audit_enabled = self.descriptor.audit_enabled;
74        let needs_tx = emits_event || audit_enabled;
75        let record = if needs_tx {
76            let mut tx = self
77                .runtime
78                .pool()
79                .begin()
80                .await
81                .map_err(|error| CoolError::Database(error.to_string()))?;
82            if emits_event {
83                ensure_event_outbox_table(&mut *tx).await?;
84            }
85            if audit_enabled {
86                ensure_audit_table(self.runtime.pool()).await?;
87            }
88
89            let record = delete_returning_record(&mut *tx, self.descriptor, self.id, ctx).await?;
90            if emits_event {
91                enqueue_event_outbox(
92                    &mut *tx,
93                    self.descriptor.schema_name,
94                    ModelEventKind::Deleted,
95                    &record,
96                )
97                .await?;
98            }
99            if audit_enabled {
100                // DELETE ... RETURNING yields the row's pre-delete
101                // state, so it doubles as the audit `before` snapshot.
102                let before = serde_json::to_value(&record).ok();
103                let event =
104                    build_audit_event(self.descriptor, AuditOperation::Delete, before, None, ctx);
105                enqueue_audit_event(&mut *tx, &event).await?;
106            }
107            tx.commit()
108                .await
109                .map_err(|error| CoolError::Database(error.to_string()))?;
110            record
111        } else {
112            delete_returning_record(self.runtime.pool(), self.descriptor, self.id, ctx).await?
113        };
114
115        if emits_event {
116            let _ = self.runtime.drain_event_outbox().await;
117        }
118
119        Ok(record)
120    }
121}