Skip to main content

cratestack_sqlx/query/batch/
delete.rs

1//! `batch_delete` — single `DELETE ... RETURNING` (or `UPDATE` for
2//! soft-delete) with the delete policy in the WHERE. Per-item audit
3//! and outbox events fan out from the RETURNING rows.
4
5use std::collections::HashMap;
6use std::hash::Hash;
7
8use cratestack_core::{
9    AuditOperation, BatchResponse, CoolContext, CoolError, ModelEventKind,
10};
11
12use crate::audit::{build_audit_event, enqueue_audit_event, ensure_audit_table};
13use crate::descriptor::{enqueue_event_outbox, ensure_event_outbox_table};
14use crate::query::support::push_action_policy_query;
15use crate::{ModelDescriptor, ModelPrimaryKey, SqlxRuntime, sqlx};
16
17use super::validate::{reject_duplicate_pks, validate_batch_size};
18
19#[derive(Debug, Clone)]
20pub struct BatchDelete<'a, M: 'static, PK: 'static> {
21    pub(crate) runtime: &'a SqlxRuntime,
22    pub(crate) descriptor: &'static ModelDescriptor<M, PK>,
23    pub(crate) ids: Vec<PK>,
24}
25
26impl<'a, M: 'static, PK: 'static> BatchDelete<'a, M, PK> {
27    pub async fn run(self, ctx: &CoolContext) -> Result<BatchResponse<M>, CoolError>
28    where
29        for<'r> M: Send
30            + Unpin
31            + sqlx::FromRow<'r, sqlx::postgres::PgRow>
32            + ModelPrimaryKey<PK>
33            + serde::Serialize,
34        PK: Clone
35            + Eq
36            + Hash
37            + Send
38            + sqlx::Type<sqlx::Postgres>
39            + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
40    {
41        validate_batch_size(self.ids.len())?;
42        reject_duplicate_pks(&self.ids)?;
43        if self.ids.is_empty() {
44            return Ok(BatchResponse::from_results(vec![]));
45        }
46
47        let emits_event = self.descriptor.emits(ModelEventKind::Deleted);
48        let audit_enabled = self.descriptor.audit_enabled;
49
50        let mut tx = self
51            .runtime
52            .pool()
53            .begin()
54            .await
55            .map_err(|error| CoolError::Database(error.to_string()))?;
56        if emits_event {
57            ensure_event_outbox_table(&mut *tx).await?;
58        }
59        if audit_enabled {
60            ensure_audit_table(self.runtime.pool()).await?;
61        }
62
63        let mut query = sqlx::QueryBuilder::<sqlx::Postgres>::new("");
64        match self.descriptor.soft_delete_column {
65            Some(col) => {
66                query.push("UPDATE ").push(self.descriptor.table_name);
67                query.push(" SET ").push(col).push(" = NOW()");
68                if let Some(version_col) = self.descriptor.version_column {
69                    query
70                        .push(", ")
71                        .push(version_col)
72                        .push(" = ")
73                        .push(version_col)
74                        .push(" + 1");
75                }
76                query.push(" WHERE ").push(col).push(" IS NULL AND ");
77            }
78            None => {
79                query.push("DELETE FROM ").push(self.descriptor.table_name);
80                query.push(" WHERE ");
81            }
82        }
83        query.push(self.descriptor.primary_key).push(" IN (");
84        for (index, id) in self.ids.iter().enumerate() {
85            if index > 0 {
86                query.push(", ");
87            }
88            query.push_bind(id.clone());
89        }
90        query.push(") AND ");
91        push_action_policy_query(
92            &mut query,
93            self.descriptor.delete_allow_policies,
94            self.descriptor.delete_deny_policies,
95            ctx,
96        );
97        query
98            .push(" RETURNING ")
99            .push(self.descriptor.select_projection());
100
101        let deleted: Vec<M> = query
102            .build_query_as::<M>()
103            .fetch_all(&mut *tx)
104            .await
105            .map_err(|error| CoolError::Database(error.to_string()))?;
106
107        // The RETURNING row IS the "before" snapshot — DELETE/soft-
108        // delete returns the pre-mutation state.
109        for record in &deleted {
110            if emits_event {
111                enqueue_event_outbox(
112                    &mut *tx,
113                    self.descriptor.schema_name,
114                    ModelEventKind::Deleted,
115                    record,
116                )
117                .await?;
118            }
119            if audit_enabled {
120                let before = serde_json::to_value(record).ok();
121                let event = build_audit_event(
122                    self.descriptor,
123                    AuditOperation::Delete,
124                    before,
125                    None,
126                    ctx,
127                );
128                enqueue_audit_event(&mut *tx, &event).await?;
129            }
130        }
131
132        tx.commit()
133            .await
134            .map_err(|error| CoolError::Database(error.to_string()))?;
135
136        if emits_event {
137            let _ = self.runtime.drain_event_outbox().await;
138        }
139
140        // Walk-and-match: any input id whose row isn't in `deleted`
141        // failed the WHERE (tombstoned, policy denied, never existed).
142        // All three collapse to NotFound on the wire.
143        let mut by_pk: HashMap<PK, M> =
144            deleted.into_iter().map(|m| (m.primary_key(), m)).collect();
145        let per_item: Vec<Result<M, CoolError>> = self
146            .ids
147            .into_iter()
148            .map(|id| {
149                by_pk
150                    .remove(&id)
151                    .ok_or_else(|| CoolError::NotFound("no row matched".to_owned()))
152            })
153            .collect();
154
155        Ok(BatchResponse::from_results(per_item))
156    }
157}