cratestack_sqlx/query/write/
delete_many.rs1use cratestack_core::{BatchSummary, CoolContext, CoolError};
10
11use crate::{FilterExpr, ModelDescriptor, SqlxRuntime, sqlx};
12
13use super::delete_many_exec::run_delete_many_in_tx;
14
15#[derive(Debug, Clone)]
16pub struct DeleteMany<'a, M: 'static, PK: 'static> {
17 pub(crate) runtime: &'a SqlxRuntime,
18 pub(crate) descriptor: &'static ModelDescriptor<M, PK>,
19 pub(crate) filters: Vec<FilterExpr>,
20}
21
22impl<'a, M: 'static, PK: 'static> DeleteMany<'a, M, PK> {
23 pub fn where_(mut self, filter: crate::Filter) -> Self {
24 self.filters.push(FilterExpr::from(filter));
25 self
26 }
27
28 pub fn where_expr(mut self, filter: FilterExpr) -> Self {
29 self.filters.push(filter);
30 self
31 }
32
33 pub fn where_any(mut self, filters: impl IntoIterator<Item = FilterExpr>) -> Self {
34 self.filters.push(FilterExpr::any(filters));
35 self
36 }
37
38 pub fn where_optional<F>(mut self, filter: Option<F>) -> Self
40 where
41 F: Into<FilterExpr>,
42 {
43 if let Some(filter) = filter {
44 self.filters.push(filter.into());
45 }
46 self
47 }
48
49 pub fn preview_sql(&self) -> String {
53 let mut sql = match self.descriptor.soft_delete_column {
54 Some(col) => {
55 let mut s = format!("UPDATE {} SET {col} = NOW()", self.descriptor.table_name);
56 if let Some(version_col) = self.descriptor.version_column {
57 s.push_str(&format!(", {version_col} = {version_col} + 1"));
58 }
59 s.push_str(&format!(" WHERE {col} IS NULL AND "));
60 s
61 }
62 None => format!("DELETE FROM {} WHERE ", self.descriptor.table_name),
63 };
64 sql.push_str("<filters> AND <delete_policy> RETURNING ");
65 sql.push_str(&self.descriptor.select_projection());
66 sql
67 }
68
69 pub async fn run(self, ctx: &CoolContext) -> Result<BatchSummary, CoolError>
70 where
71 for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
72 {
73 let runtime = self.runtime;
74 let descriptor = self.descriptor;
75 let mut tx = runtime
76 .pool()
77 .begin()
78 .await
79 .map_err(|error| CoolError::Database(error.to_string()))?;
80 let (summary, emits_event) =
81 run_delete_many_in_tx(&mut tx, runtime.pool(), descriptor, &self.filters, ctx).await?;
82 tx.commit()
83 .await
84 .map_err(|error| CoolError::Database(error.to_string()))?;
85 if emits_event {
86 let _ = runtime.drain_event_outbox().await;
87 }
88 Ok(summary)
89 }
90
91 pub async fn run_in_tx<'tx>(
92 self,
93 tx: &mut sqlx::Transaction<'tx, sqlx::Postgres>,
94 ctx: &CoolContext,
95 ) -> Result<BatchSummary, CoolError>
96 where
97 for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
98 {
99 let (summary, _) =
100 run_delete_many_in_tx(tx, self.runtime.pool(), self.descriptor, &self.filters, ctx)
101 .await?;
102 Ok(summary)
103 }
104}