1use crate::{
2 Error, Key,
3 db::{
4 Db,
5 executor::{
6 FilterEvaluator,
7 plan::{plan_for, scan_plan, set_rows_from_len},
8 },
9 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10 query::{DeleteQuery, QueryPlan, QueryValidate},
11 response::Response,
12 store::DataKey,
13 },
14 obs::metrics,
15 traits::{EntityKind, FieldValue},
16};
17use std::{marker::PhantomData, ops::ControlFlow};
18
19struct DeleteAccumulator<'f, E> {
24 filter: Option<&'f FilterExpr>,
25 offset: usize,
26 skipped: usize,
27 limit: Option<usize>,
28 matches: Vec<(DataKey, E)>,
29}
30
31impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
32 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
33 Self {
34 filter,
35 offset,
36 skipped: 0,
37 limit,
38 matches: Vec::with_capacity(limit.unwrap_or(0)),
39 }
40 }
41
42 fn limit_reached(&self) -> bool {
43 self.limit.is_some_and(|lim| self.matches.len() >= lim)
44 }
45
46 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
47 if let Some(f) = self.filter
48 && !FilterEvaluator::new(&entity).eval(f)
49 {
50 return false;
51 }
52
53 if self.skipped < self.offset {
54 self.skipped += 1;
55 return false;
56 }
57
58 if self.limit_reached() {
59 return true;
60 }
61
62 self.matches.push((dk, entity));
63 false
64 }
65}
66
67#[derive(Clone, Copy)]
72pub struct DeleteExecutor<E: EntityKind> {
73 db: Db<E::Canister>,
74 debug: bool,
75 _marker: PhantomData<E>,
76}
77
78impl<E: EntityKind> DeleteExecutor<E> {
79 #[must_use]
80 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
81 Self {
82 db,
83 debug,
84 _marker: PhantomData,
85 }
86 }
87
88 #[must_use]
89 pub const fn debug(mut self) -> Self {
90 self.debug = true;
91 self
92 }
93
94 pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, Error> {
100 let query = DeleteQuery::new().one::<E>(pk);
101 self.execute(query)
102 }
103
104 pub fn only(self) -> Result<Response<E>, Error> {
106 let query = DeleteQuery::new().one::<E>(());
107 self.execute(query)
108 }
109
110 pub fn many<I, V>(self, values: I) -> Result<Response<E>, Error>
112 where
113 I: IntoIterator<Item = V>,
114 V: FieldValue,
115 {
116 let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
117
118 self.execute(query)
119 }
120
121 pub fn one_by_field(
127 self,
128 field: impl AsRef<str>,
129 value: impl FieldValue,
130 ) -> Result<Response<E>, Error> {
131 let query = DeleteQuery::new().one_by_field(field, value);
132 self.execute(query)
133 }
134
135 pub fn many_by_field<I, V>(
137 self,
138 field: impl AsRef<str>,
139 values: I,
140 ) -> Result<Response<E>, Error>
141 where
142 I: IntoIterator<Item = V>,
143 V: FieldValue,
144 {
145 let query = DeleteQuery::new().many_by_field(field, values);
146 self.execute(query)
147 }
148
149 pub fn all(self) -> Result<Response<E>, Error> {
151 self.execute(DeleteQuery::new())
152 }
153
154 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
156 where
157 F: FnOnce(FilterDsl) -> I,
158 I: IntoFilterExpr,
159 {
160 let query = DeleteQuery::new().filter(f);
161 self.execute(query)
162 }
163
164 pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), Error> {
169 self.one(pk)?.require_one()?;
170 Ok(())
171 }
172
173 pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), Error>
174 where
175 I: IntoIterator<Item = V>,
176 V: FieldValue,
177 {
178 self.many(pks)?.require_some()?;
179
180 Ok(())
181 }
182
183 pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), Error>
184 where
185 I: IntoIterator<Item = V>,
186 V: FieldValue,
187 {
188 self.ensure_delete_any_by_pk(values)
189 }
190
191 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, Error> {
196 QueryValidate::<E>::validate(&query)?;
197 Ok(plan_for::<E>(query.filter.as_ref()))
198 }
199
200 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, Error> {
201 QueryValidate::<E>::validate(&query)?;
202 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
203
204 let plan = plan_for::<E>(query.filter.as_ref());
205
206 let limit = query
207 .limit
208 .as_ref()
209 .and_then(|l| l.limit)
210 .map(|l| l as usize);
211
212 let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
213 let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
214
215 let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
216
217 scan_plan::<E, _>(&self.db, plan, |dk, entity| {
218 if acc.should_stop(dk, entity) {
219 ControlFlow::Break(())
220 } else {
221 ControlFlow::Continue(())
222 }
223 })?;
224
225 let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
226 self.db.context::<E>().with_store_mut(|s| {
227 for (dk, entity) in acc.matches {
228 s.remove(&dk);
229 if !E::INDEXES.is_empty() {
230 self.remove_indexes(&entity)?;
231 }
232 res.push((dk.key(), entity));
233 }
234 Ok::<_, Error>(())
235 })??;
236
237 set_rows_from_len(&mut span, res.len());
238 Ok(Response(res))
239 }
240
241 fn remove_indexes(&self, entity: &E) -> Result<(), Error> {
242 for index in E::INDEXES {
243 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
244 store.with_borrow_mut(|this| {
245 this.remove_index_entry(entity, index);
246 });
247 }
248 Ok(())
249 }
250}