cratestack_sqlx/query/batch/
delete.rs1use std::collections::HashMap;
6use std::hash::Hash;
7
8use cratestack_core::{AuditOperation, BatchResponse, CoolContext, CoolError, ModelEventKind};
9
10use crate::audit::{build_audit_event, enqueue_audit_event, ensure_audit_table};
11use crate::descriptor::{enqueue_event_outbox, ensure_event_outbox_table};
12use crate::query::support::push_action_policy_query;
13use crate::{ModelDescriptor, ModelPrimaryKey, SqlxRuntime, sqlx};
14
15use super::validate::{reject_duplicate_pks, validate_batch_size};
16
17#[derive(Debug, Clone)]
18pub struct BatchDelete<'a, M: 'static, PK: 'static> {
19 pub(crate) runtime: &'a SqlxRuntime,
20 pub(crate) descriptor: &'static ModelDescriptor<M, PK>,
21 pub(crate) ids: Vec<PK>,
22}
23
24impl<'a, M: 'static, PK: 'static> BatchDelete<'a, M, PK> {
25 pub async fn run(self, ctx: &CoolContext) -> Result<BatchResponse<M>, CoolError>
26 where
27 for<'r> M: Send
28 + Unpin
29 + sqlx::FromRow<'r, sqlx::postgres::PgRow>
30 + ModelPrimaryKey<PK>
31 + serde::Serialize,
32 PK: Clone
33 + Eq
34 + Hash
35 + Send
36 + sqlx::Type<sqlx::Postgres>
37 + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
38 {
39 validate_batch_size(self.ids.len())?;
40 reject_duplicate_pks(&self.ids)?;
41 if self.ids.is_empty() {
42 return Ok(BatchResponse::from_results(vec![]));
43 }
44
45 let emits_event = self.descriptor.emits(ModelEventKind::Deleted);
46 let audit_enabled = self.descriptor.audit_enabled;
47
48 let mut tx = self
49 .runtime
50 .pool()
51 .begin()
52 .await
53 .map_err(|error| CoolError::Database(error.to_string()))?;
54 if emits_event {
55 ensure_event_outbox_table(&mut *tx).await?;
56 }
57 if audit_enabled {
58 ensure_audit_table(self.runtime.pool()).await?;
59 }
60
61 let mut query = sqlx::QueryBuilder::<sqlx::Postgres>::new("");
62 match self.descriptor.soft_delete_column {
63 Some(col) => {
64 query.push("UPDATE ").push(self.descriptor.table_name);
65 query.push(" SET ").push(col).push(" = NOW()");
66 if let Some(version_col) = self.descriptor.version_column {
67 query
68 .push(", ")
69 .push(version_col)
70 .push(" = ")
71 .push(version_col)
72 .push(" + 1");
73 }
74 query.push(" WHERE ").push(col).push(" IS NULL AND ");
75 }
76 None => {
77 query.push("DELETE FROM ").push(self.descriptor.table_name);
78 query.push(" WHERE ");
79 }
80 }
81 query.push(self.descriptor.primary_key).push(" IN (");
82 for (index, id) in self.ids.iter().enumerate() {
83 if index > 0 {
84 query.push(", ");
85 }
86 query.push_bind(id.clone());
87 }
88 query.push(") AND ");
89 push_action_policy_query(
90 &mut query,
91 self.descriptor.delete_allow_policies,
92 self.descriptor.delete_deny_policies,
93 ctx,
94 );
95 query
96 .push(" RETURNING ")
97 .push(self.descriptor.select_projection());
98
99 let deleted: Vec<M> = query
100 .build_query_as::<M>()
101 .fetch_all(&mut *tx)
102 .await
103 .map_err(|error| CoolError::Database(error.to_string()))?;
104
105 for record in &deleted {
108 if emits_event {
109 enqueue_event_outbox(
110 &mut *tx,
111 self.descriptor.schema_name,
112 ModelEventKind::Deleted,
113 record,
114 )
115 .await?;
116 }
117 if audit_enabled {
118 let before = serde_json::to_value(record).ok();
119 let event =
120 build_audit_event(self.descriptor, AuditOperation::Delete, before, None, ctx);
121 enqueue_audit_event(&mut *tx, &event).await?;
122 }
123 }
124
125 tx.commit()
126 .await
127 .map_err(|error| CoolError::Database(error.to_string()))?;
128
129 if emits_event {
130 let _ = self.runtime.drain_event_outbox().await;
131 }
132
133 let mut by_pk: HashMap<PK, M> = deleted.into_iter().map(|m| (m.primary_key(), m)).collect();
137 let per_item: Vec<Result<M, CoolError>> = self
138 .ids
139 .into_iter()
140 .map(|id| {
141 by_pk
142 .remove(&id)
143 .ok_or_else(|| CoolError::NotFound("no row matched".to_owned()))
144 })
145 .collect();
146
147 Ok(BatchResponse::from_results(per_item))
148 }
149}