1use crate::{
2 Error, IndexSpec, Key,
3 db::{
4 Db,
5 executor::{
6 ExecutorError, FilterEvaluator, UniqueIndexHandle,
7 plan::{plan_for, scan_plan, set_rows_from_len},
8 resolve_unique_pk,
9 },
10 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
11 query::{DeleteQuery, QueryPlan, QueryValidate},
12 response::Response,
13 store::DataKey,
14 },
15 deserialize,
16 obs::metrics,
17 traits::{EntityKind, FieldValue, FromKey},
18 visitor::sanitize,
19};
20use std::{marker::PhantomData, ops::ControlFlow};
21
22struct DeleteAccumulator<'f, E> {
27 filter: Option<&'f FilterExpr>,
28 offset: usize,
29 skipped: usize,
30 limit: Option<usize>,
31 matches: Vec<(DataKey, E)>,
32}
33
34impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
35 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
36 Self {
37 filter,
38 offset,
39 skipped: 0,
40 limit,
41 matches: Vec::with_capacity(limit.unwrap_or(0)),
42 }
43 }
44
45 fn limit_reached(&self) -> bool {
46 self.limit.is_some_and(|lim| self.matches.len() >= lim)
47 }
48
49 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
50 if let Some(f) = self.filter
51 && !FilterEvaluator::new(&entity).eval(f)
52 {
53 return false;
54 }
55
56 if self.skipped < self.offset {
57 self.skipped += 1;
58 return false;
59 }
60
61 if self.limit_reached() {
62 return true;
63 }
64
65 self.matches.push((dk, entity));
66 false
67 }
68}
69
70#[derive(Clone, Copy)]
75pub struct DeleteExecutor<E: EntityKind> {
76 db: Db<E::Canister>,
77 debug: bool,
78 _marker: PhantomData<E>,
79}
80
81impl<E: EntityKind> DeleteExecutor<E> {
82 #[must_use]
83 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
84 Self {
85 db,
86 debug,
87 _marker: PhantomData,
88 }
89 }
90
91 #[must_use]
92 pub const fn debug(mut self) -> Self {
93 self.debug = true;
94 self
95 }
96
97 pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, Error> {
103 let query = DeleteQuery::new().one::<E>(pk);
104 self.execute(query)
105 }
106
107 pub fn only(self) -> Result<Response<E>, Error> {
109 let query = DeleteQuery::new().one::<E>(());
110 self.execute(query)
111 }
112
113 pub fn many<I, V>(self, values: I) -> Result<Response<E>, Error>
115 where
116 I: IntoIterator<Item = V>,
117 V: FieldValue,
118 {
119 let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
120
121 self.execute(query)
122 }
123
124 pub fn by_unique_index(self, index: UniqueIndexHandle, entity: E) -> Result<Response<E>, Error>
130 where
131 E::PrimaryKey: FromKey,
132 {
133 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
134 let index = index.index();
135 let mut lookup = entity;
136 sanitize(&mut lookup);
137
138 let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
139 set_rows_from_len(&mut span, 0);
140 return Ok(Response(Vec::new()));
141 };
142
143 let (dk, stored) = self.load_existing(index, pk)?;
144
145 self.db.context::<E>().with_store_mut(|s| {
146 s.remove(&dk);
147 if !E::INDEXES.is_empty() {
148 self.remove_indexes(&stored)?;
149 }
150 Ok::<_, Error>(())
151 })??;
152
153 set_rows_from_len(&mut span, 1);
154 Ok(Response(vec![(dk.key(), stored)]))
155 }
156
157 pub fn one_by_field(
163 self,
164 field: impl AsRef<str>,
165 value: impl FieldValue,
166 ) -> Result<Response<E>, Error> {
167 let query = DeleteQuery::new().one_by_field(field, value);
168 self.execute(query)
169 }
170
171 pub fn many_by_field<I, V>(
173 self,
174 field: impl AsRef<str>,
175 values: I,
176 ) -> Result<Response<E>, Error>
177 where
178 I: IntoIterator<Item = V>,
179 V: FieldValue,
180 {
181 let query = DeleteQuery::new().many_by_field(field, values);
182 self.execute(query)
183 }
184
185 pub fn all(self) -> Result<Response<E>, Error> {
187 self.execute(DeleteQuery::new())
188 }
189
190 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
192 where
193 F: FnOnce(FilterDsl) -> I,
194 I: IntoFilterExpr,
195 {
196 let query = DeleteQuery::new().filter(f);
197 self.execute(query)
198 }
199
200 pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), Error> {
205 self.one(pk)?.require_one()?;
206 Ok(())
207 }
208
209 pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), Error>
210 where
211 I: IntoIterator<Item = V>,
212 V: FieldValue,
213 {
214 self.many(pks)?.require_some()?;
215
216 Ok(())
217 }
218
219 pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), Error>
220 where
221 I: IntoIterator<Item = V>,
222 V: FieldValue,
223 {
224 self.ensure_delete_any_by_pk(values)
225 }
226
227 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, Error> {
232 QueryValidate::<E>::validate(&query)?;
233 Ok(plan_for::<E>(query.filter.as_ref()))
234 }
235
236 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, Error> {
242 QueryValidate::<E>::validate(&query)?;
243 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
244
245 let plan = plan_for::<E>(query.filter.as_ref());
246
247 let limit = query
248 .limit
249 .as_ref()
250 .and_then(|l| l.limit)
251 .map(|l| l as usize);
252
253 let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
254 let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
255
256 let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
257
258 scan_plan::<E, _>(&self.db, plan, |dk, entity| {
259 if acc.should_stop(dk, entity) {
260 ControlFlow::Break(())
261 } else {
262 ControlFlow::Continue(())
263 }
264 })?;
265
266 let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
267 self.db.context::<E>().with_store_mut(|s| {
268 for (dk, entity) in acc.matches {
269 s.remove(&dk);
270 if !E::INDEXES.is_empty() {
271 self.remove_indexes(&entity)?;
272 }
273 res.push((dk.key(), entity));
274 }
275 Ok::<_, Error>(())
276 })??;
277
278 set_rows_from_len(&mut span, res.len());
279
280 Ok(Response(res))
281 }
282
283 fn remove_indexes(&self, entity: &E) -> Result<(), Error> {
284 for index in E::INDEXES {
285 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
286 store.with_borrow_mut(|this| {
287 this.remove_index_entry(entity, index);
288 });
289 }
290 Ok(())
291 }
292
293 fn load_existing(
294 &self,
295 index: &'static IndexSpec,
296 pk: E::PrimaryKey,
297 ) -> Result<(DataKey, E), Error> {
298 let data_key = DataKey::new::<E>(pk.into());
299 let bytes = self
300 .db
301 .context::<E>()
302 .with_store(|store| store.get(&data_key))?;
303
304 let Some(bytes) = bytes else {
305 return Err(ExecutorError::IndexCorrupted(
306 E::PATH.to_string(),
307 index.fields.join(", "),
308 1,
309 )
310 .into());
311 };
312
313 let entity = deserialize::<E>(&bytes)?;
314 Ok((data_key, entity))
315 }
316}