cratestack_sqlx/query/batch/
delete.rs1use std::collections::HashMap;
6use std::hash::Hash;
7
8use cratestack_core::{
9 AuditOperation, BatchResponse, CoolContext, CoolError, ModelEventKind,
10};
11
12use crate::audit::{build_audit_event, enqueue_audit_event, ensure_audit_table};
13use crate::descriptor::{enqueue_event_outbox, ensure_event_outbox_table};
14use crate::query::support::push_action_policy_query;
15use crate::{ModelDescriptor, ModelPrimaryKey, SqlxRuntime, sqlx};
16
17use super::validate::{reject_duplicate_pks, validate_batch_size};
18
19#[derive(Debug, Clone)]
20pub struct BatchDelete<'a, M: 'static, PK: 'static> {
21 pub(crate) runtime: &'a SqlxRuntime,
22 pub(crate) descriptor: &'static ModelDescriptor<M, PK>,
23 pub(crate) ids: Vec<PK>,
24}
25
26impl<'a, M: 'static, PK: 'static> BatchDelete<'a, M, PK> {
27 pub async fn run(self, ctx: &CoolContext) -> Result<BatchResponse<M>, CoolError>
28 where
29 for<'r> M: Send
30 + Unpin
31 + sqlx::FromRow<'r, sqlx::postgres::PgRow>
32 + ModelPrimaryKey<PK>
33 + serde::Serialize,
34 PK: Clone
35 + Eq
36 + Hash
37 + Send
38 + sqlx::Type<sqlx::Postgres>
39 + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
40 {
41 validate_batch_size(self.ids.len())?;
42 reject_duplicate_pks(&self.ids)?;
43 if self.ids.is_empty() {
44 return Ok(BatchResponse::from_results(vec![]));
45 }
46
47 let emits_event = self.descriptor.emits(ModelEventKind::Deleted);
48 let audit_enabled = self.descriptor.audit_enabled;
49
50 let mut tx = self
51 .runtime
52 .pool()
53 .begin()
54 .await
55 .map_err(|error| CoolError::Database(error.to_string()))?;
56 if emits_event {
57 ensure_event_outbox_table(&mut *tx).await?;
58 }
59 if audit_enabled {
60 ensure_audit_table(self.runtime.pool()).await?;
61 }
62
63 let mut query = sqlx::QueryBuilder::<sqlx::Postgres>::new("");
64 match self.descriptor.soft_delete_column {
65 Some(col) => {
66 query.push("UPDATE ").push(self.descriptor.table_name);
67 query.push(" SET ").push(col).push(" = NOW()");
68 if let Some(version_col) = self.descriptor.version_column {
69 query
70 .push(", ")
71 .push(version_col)
72 .push(" = ")
73 .push(version_col)
74 .push(" + 1");
75 }
76 query.push(" WHERE ").push(col).push(" IS NULL AND ");
77 }
78 None => {
79 query.push("DELETE FROM ").push(self.descriptor.table_name);
80 query.push(" WHERE ");
81 }
82 }
83 query.push(self.descriptor.primary_key).push(" IN (");
84 for (index, id) in self.ids.iter().enumerate() {
85 if index > 0 {
86 query.push(", ");
87 }
88 query.push_bind(id.clone());
89 }
90 query.push(") AND ");
91 push_action_policy_query(
92 &mut query,
93 self.descriptor.delete_allow_policies,
94 self.descriptor.delete_deny_policies,
95 ctx,
96 );
97 query
98 .push(" RETURNING ")
99 .push(self.descriptor.select_projection());
100
101 let deleted: Vec<M> = query
102 .build_query_as::<M>()
103 .fetch_all(&mut *tx)
104 .await
105 .map_err(|error| CoolError::Database(error.to_string()))?;
106
107 for record in &deleted {
110 if emits_event {
111 enqueue_event_outbox(
112 &mut *tx,
113 self.descriptor.schema_name,
114 ModelEventKind::Deleted,
115 record,
116 )
117 .await?;
118 }
119 if audit_enabled {
120 let before = serde_json::to_value(record).ok();
121 let event = build_audit_event(
122 self.descriptor,
123 AuditOperation::Delete,
124 before,
125 None,
126 ctx,
127 );
128 enqueue_audit_event(&mut *tx, &event).await?;
129 }
130 }
131
132 tx.commit()
133 .await
134 .map_err(|error| CoolError::Database(error.to_string()))?;
135
136 if emits_event {
137 let _ = self.runtime.drain_event_outbox().await;
138 }
139
140 let mut by_pk: HashMap<PK, M> =
144 deleted.into_iter().map(|m| (m.primary_key(), m)).collect();
145 let per_item: Vec<Result<M, CoolError>> = self
146 .ids
147 .into_iter()
148 .map(|id| {
149 by_pk
150 .remove(&id)
151 .ok_or_else(|| CoolError::NotFound("no row matched".to_owned()))
152 })
153 .collect();
154
155 Ok(BatchResponse::from_results(per_item))
156 }
157}