cratestack_sqlx/query/write/
delete.rs1use cratestack_core::{AuditOperation, CoolContext, CoolError, ModelEventKind};
6
7use crate::audit::{build_audit_event, enqueue_audit_event, ensure_audit_table};
8use crate::descriptor::{enqueue_event_outbox, ensure_event_outbox_table};
9use crate::{ModelDescriptor, SqlxRuntime, sqlx};
10
11use super::delete_exec::delete_returning_record;
12
13#[derive(Debug, Clone)]
14pub struct DeleteRecord<'a, M: 'static, PK: 'static> {
15 pub(crate) runtime: &'a SqlxRuntime,
16 pub(crate) descriptor: &'static ModelDescriptor<M, PK>,
17 pub(crate) id: PK,
18}
19
20impl<'a, M: 'static, PK: 'static> DeleteRecord<'a, M, PK> {
21 pub fn preview_sql(&self) -> String {
22 format!(
23 "DELETE FROM {} WHERE {} = $1 RETURNING {}",
24 self.descriptor.table_name,
25 self.descriptor.primary_key,
26 self.descriptor.select_projection(),
27 )
28 }
29
30 pub async fn run_in_tx<'tx>(
32 self,
33 tx: &mut sqlx::Transaction<'tx, sqlx::Postgres>,
34 ctx: &CoolContext,
35 ) -> Result<M, CoolError>
36 where
37 for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
38 PK: Send + sqlx::Type<sqlx::Postgres> + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
39 {
40 let emits_event = self.descriptor.emits(ModelEventKind::Deleted);
41 let audit_enabled = self.descriptor.audit_enabled;
42 if emits_event {
43 ensure_event_outbox_table(&mut **tx).await?;
44 }
45 if audit_enabled {
46 ensure_audit_table(self.runtime.pool()).await?;
47 }
48 let record = delete_returning_record(&mut **tx, self.descriptor, self.id, ctx).await?;
49 if emits_event {
50 enqueue_event_outbox(
51 &mut **tx,
52 self.descriptor.schema_name,
53 ModelEventKind::Deleted,
54 &record,
55 )
56 .await?;
57 }
58 if audit_enabled {
59 let before = serde_json::to_value(&record).ok();
60 let event =
61 build_audit_event(self.descriptor, AuditOperation::Delete, before, None, ctx);
62 enqueue_audit_event(&mut **tx, &event).await?;
63 }
64 Ok(record)
65 }
66
67 pub async fn run(self, ctx: &CoolContext) -> Result<M, CoolError>
68 where
69 for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
70 PK: Send + sqlx::Type<sqlx::Postgres> + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
71 {
72 let emits_event = self.descriptor.emits(ModelEventKind::Deleted);
73 let audit_enabled = self.descriptor.audit_enabled;
74 let needs_tx = emits_event || audit_enabled;
75 let record = if needs_tx {
76 let mut tx = self
77 .runtime
78 .pool()
79 .begin()
80 .await
81 .map_err(|error| CoolError::Database(error.to_string()))?;
82 if emits_event {
83 ensure_event_outbox_table(&mut *tx).await?;
84 }
85 if audit_enabled {
86 ensure_audit_table(self.runtime.pool()).await?;
87 }
88
89 let record = delete_returning_record(&mut *tx, self.descriptor, self.id, ctx).await?;
90 if emits_event {
91 enqueue_event_outbox(
92 &mut *tx,
93 self.descriptor.schema_name,
94 ModelEventKind::Deleted,
95 &record,
96 )
97 .await?;
98 }
99 if audit_enabled {
100 let before = serde_json::to_value(&record).ok();
103 let event =
104 build_audit_event(self.descriptor, AuditOperation::Delete, before, None, ctx);
105 enqueue_audit_event(&mut *tx, &event).await?;
106 }
107 tx.commit()
108 .await
109 .map_err(|error| CoolError::Database(error.to_string()))?;
110 record
111 } else {
112 delete_returning_record(self.runtime.pool(), self.descriptor, self.id, ctx).await?
113 };
114
115 if emits_event {
116 let _ = self.runtime.drain_event_outbox().await;
117 }
118
119 Ok(record)
120 }
121}