Skip to main content

cratestack_sqlx/query/batch/
delete.rs

1//! `batch_delete` — single `DELETE ... RETURNING` (or `UPDATE` for
2//! soft-delete) with the delete policy in the WHERE. Per-item audit
3//! and outbox events fan out from the RETURNING rows.
4
5use std::collections::HashMap;
6use std::hash::Hash;
7
8use cratestack_core::{AuditOperation, BatchResponse, CoolContext, CoolError, ModelEventKind};
9
10use crate::audit::{build_audit_event, enqueue_audit_event, ensure_audit_table};
11use crate::descriptor::{enqueue_event_outbox, ensure_event_outbox_table};
12use crate::query::support::push_action_policy_query;
13use crate::{ModelDescriptor, ModelPrimaryKey, SqlxRuntime, sqlx};
14
15use super::validate::{reject_duplicate_pks, validate_batch_size};
16
17#[derive(Debug, Clone)]
18pub struct BatchDelete<'a, M: 'static, PK: 'static> {
19    pub(crate) runtime: &'a SqlxRuntime,
20    pub(crate) descriptor: &'static ModelDescriptor<M, PK>,
21    pub(crate) ids: Vec<PK>,
22}
23
24impl<'a, M: 'static, PK: 'static> BatchDelete<'a, M, PK> {
25    pub async fn run(self, ctx: &CoolContext) -> Result<BatchResponse<M>, CoolError>
26    where
27        for<'r> M: Send
28            + Unpin
29            + sqlx::FromRow<'r, sqlx::postgres::PgRow>
30            + ModelPrimaryKey<PK>
31            + serde::Serialize,
32        PK: Clone
33            + Eq
34            + Hash
35            + Send
36            + sqlx::Type<sqlx::Postgres>
37            + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
38    {
39        validate_batch_size(self.ids.len())?;
40        reject_duplicate_pks(&self.ids)?;
41        if self.ids.is_empty() {
42            return Ok(BatchResponse::from_results(vec![]));
43        }
44
45        let emits_event = self.descriptor.emits(ModelEventKind::Deleted);
46        let audit_enabled = self.descriptor.audit_enabled;
47
48        let mut tx = self
49            .runtime
50            .pool()
51            .begin()
52            .await
53            .map_err(|error| CoolError::Database(error.to_string()))?;
54        if emits_event {
55            ensure_event_outbox_table(&mut *tx).await?;
56        }
57        if audit_enabled {
58            ensure_audit_table(self.runtime.pool()).await?;
59        }
60
61        let mut query = sqlx::QueryBuilder::<sqlx::Postgres>::new("");
62        match self.descriptor.soft_delete_column {
63            Some(col) => {
64                query.push("UPDATE ").push(self.descriptor.table_name);
65                query.push(" SET ").push(col).push(" = NOW()");
66                if let Some(version_col) = self.descriptor.version_column {
67                    query
68                        .push(", ")
69                        .push(version_col)
70                        .push(" = ")
71                        .push(version_col)
72                        .push(" + 1");
73                }
74                query.push(" WHERE ").push(col).push(" IS NULL AND ");
75            }
76            None => {
77                query.push("DELETE FROM ").push(self.descriptor.table_name);
78                query.push(" WHERE ");
79            }
80        }
81        query.push(self.descriptor.primary_key).push(" IN (");
82        for (index, id) in self.ids.iter().enumerate() {
83            if index > 0 {
84                query.push(", ");
85            }
86            query.push_bind(id.clone());
87        }
88        query.push(") AND ");
89        push_action_policy_query(
90            &mut query,
91            self.descriptor.delete_allow_policies,
92            self.descriptor.delete_deny_policies,
93            ctx,
94        );
95        query
96            .push(" RETURNING ")
97            .push(self.descriptor.select_projection());
98
99        let deleted: Vec<M> = query
100            .build_query_as::<M>()
101            .fetch_all(&mut *tx)
102            .await
103            .map_err(|error| CoolError::Database(error.to_string()))?;
104
105        // The RETURNING row IS the "before" snapshot — DELETE/soft-
106        // delete returns the pre-mutation state.
107        for record in &deleted {
108            if emits_event {
109                enqueue_event_outbox(
110                    &mut *tx,
111                    self.descriptor.schema_name,
112                    ModelEventKind::Deleted,
113                    record,
114                )
115                .await?;
116            }
117            if audit_enabled {
118                let before = serde_json::to_value(record).ok();
119                let event =
120                    build_audit_event(self.descriptor, AuditOperation::Delete, before, None, ctx);
121                enqueue_audit_event(&mut *tx, &event).await?;
122            }
123        }
124
125        tx.commit()
126            .await
127            .map_err(|error| CoolError::Database(error.to_string()))?;
128
129        if emits_event {
130            let _ = self.runtime.drain_event_outbox().await;
131        }
132
133        // Walk-and-match: any input id whose row isn't in `deleted`
134        // failed the WHERE (tombstoned, policy denied, never existed).
135        // All three collapse to NotFound on the wire.
136        let mut by_pk: HashMap<PK, M> = deleted.into_iter().map(|m| (m.primary_key(), m)).collect();
137        let per_item: Vec<Result<M, CoolError>> = self
138            .ids
139            .into_iter()
140            .map(|id| {
141                by_pk
142                    .remove(&id)
143                    .ok_or_else(|| CoolError::NotFound("no row matched".to_owned()))
144            })
145            .collect();
146
147        Ok(BatchResponse::from_results(per_item))
148    }
149}