Skip to main content

cratestack_sqlx/query/write/
delete_many.rs

1//! Bulk DELETE-by-predicate: one statement tombstones (soft-delete)
2//! or removes (hard-delete) every row matching the filter AND the
3//! delete policy.
4//!
5//! Same shape as `update_many` — refuses to run without ≥1 filter so
6//! callers can't accidentally truncate a table at the typed-builder
7//! level.
8
9use cratestack_core::{BatchSummary, CoolContext, CoolError};
10
11use crate::{FilterExpr, ModelDescriptor, SqlxRuntime, sqlx};
12
13use super::delete_many_exec::run_delete_many_in_tx;
14
15#[derive(Debug, Clone)]
16pub struct DeleteMany<'a, M: 'static, PK: 'static> {
17    pub(crate) runtime: &'a SqlxRuntime,
18    pub(crate) descriptor: &'static ModelDescriptor<M, PK>,
19    pub(crate) filters: Vec<FilterExpr>,
20}
21
22impl<'a, M: 'static, PK: 'static> DeleteMany<'a, M, PK> {
23    pub fn where_(mut self, filter: crate::Filter) -> Self {
24        self.filters.push(FilterExpr::from(filter));
25        self
26    }
27
28    pub fn where_expr(mut self, filter: FilterExpr) -> Self {
29        self.filters.push(filter);
30        self
31    }
32
33    pub fn where_any(mut self, filters: impl IntoIterator<Item = FilterExpr>) -> Self {
34        self.filters.push(FilterExpr::any(filters));
35        self
36    }
37
38    /// Conditionally append a filter; `None` is a no-op.
39    pub fn where_optional<F>(mut self, filter: Option<F>) -> Self
40    where
41        F: Into<FilterExpr>,
42    {
43        if let Some(filter) = filter {
44            self.filters.push(filter.into());
45        }
46        self
47    }
48
49    /// Approximate SQL preview. The runtime path interpolates filter
50    /// predicates and the delete policy clause; this returns the rough
51    /// shape for migration tooling and the schema studio.
52    pub fn preview_sql(&self) -> String {
53        let mut sql = match self.descriptor.soft_delete_column {
54            Some(col) => {
55                let mut s = format!("UPDATE {} SET {col} = NOW()", self.descriptor.table_name);
56                if let Some(version_col) = self.descriptor.version_column {
57                    s.push_str(&format!(", {version_col} = {version_col} + 1"));
58                }
59                s.push_str(&format!(" WHERE {col} IS NULL AND "));
60                s
61            }
62            None => format!("DELETE FROM {} WHERE ", self.descriptor.table_name),
63        };
64        sql.push_str("<filters> AND <delete_policy> RETURNING ");
65        sql.push_str(&self.descriptor.select_projection());
66        sql
67    }
68
69    pub async fn run(self, ctx: &CoolContext) -> Result<BatchSummary, CoolError>
70    where
71        for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
72    {
73        let runtime = self.runtime;
74        let descriptor = self.descriptor;
75        let mut tx = runtime
76            .pool()
77            .begin()
78            .await
79            .map_err(|error| CoolError::Database(error.to_string()))?;
80        let (summary, emits_event) =
81            run_delete_many_in_tx(&mut tx, runtime.pool(), descriptor, &self.filters, ctx).await?;
82        tx.commit()
83            .await
84            .map_err(|error| CoolError::Database(error.to_string()))?;
85        if emits_event {
86            let _ = runtime.drain_event_outbox().await;
87        }
88        Ok(summary)
89    }
90
91    pub async fn run_in_tx<'tx>(
92        self,
93        tx: &mut sqlx::Transaction<'tx, sqlx::Postgres>,
94        ctx: &CoolContext,
95    ) -> Result<BatchSummary, CoolError>
96    where
97        for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
98    {
99        let (summary, _) =
100            run_delete_many_in_tx(tx, self.runtime.pool(), self.descriptor, &self.filters, ctx)
101                .await?;
102        Ok(summary)
103    }
104}