1use crate::{
2 Error, IndexSpec, Key,
3 db::{
4 Db,
5 executor::{
6 ExecutorError, FilterEvaluator, UniqueIndexHandle,
7 plan::{plan_for, scan_plan, set_rows_from_len},
8 resolve_unique_pk,
9 },
10 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
11 query::{DeleteQuery, QueryPlan, QueryValidate},
12 response::Response,
13 store::DataKey,
14 },
15 deserialize,
16 obs::metrics,
17 traits::{EntityKind, FieldValue, FromKey},
18 visitor::sanitize,
19};
20use std::{marker::PhantomData, ops::ControlFlow};
21
22struct DeleteAccumulator<'f, E> {
27 filter: Option<&'f FilterExpr>,
28 offset: usize,
29 skipped: usize,
30 limit: Option<usize>,
31 matches: Vec<(DataKey, E)>,
32}
33
34impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
35 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
36 Self {
37 filter,
38 offset,
39 skipped: 0,
40 limit,
41 matches: Vec::with_capacity(limit.unwrap_or(0)),
42 }
43 }
44
45 fn limit_reached(&self) -> bool {
46 self.limit.is_some_and(|lim| self.matches.len() >= lim)
47 }
48
49 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
50 if let Some(f) = self.filter
51 && !FilterEvaluator::new(&entity).eval(f)
52 {
53 return false;
54 }
55
56 if self.skipped < self.offset {
57 self.skipped += 1;
58 return false;
59 }
60
61 if self.limit_reached() {
62 return true;
63 }
64
65 self.matches.push((dk, entity));
66 false
67 }
68}
69
70#[derive(Clone, Copy)]
75pub struct DeleteExecutor<E: EntityKind> {
76 db: Db<E::Canister>,
77 debug: bool,
78 _marker: PhantomData<E>,
79}
80
81impl<E: EntityKind> DeleteExecutor<E> {
82 #[must_use]
83 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
84 Self {
85 db,
86 debug,
87 _marker: PhantomData,
88 }
89 }
90
91 #[must_use]
92 pub const fn debug(mut self) -> Self {
93 self.debug = true;
94 self
95 }
96
97 pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, Error> {
103 let query = DeleteQuery::new().one::<E>(pk);
104 self.execute(query)
105 }
106
107 pub fn only(self) -> Result<Response<E>, Error> {
109 let query = DeleteQuery::new().one::<E>(());
110 self.execute(query)
111 }
112
113 pub fn many<I, V>(self, values: I) -> Result<Response<E>, Error>
115 where
116 I: IntoIterator<Item = V>,
117 V: FieldValue,
118 {
119 let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
120
121 self.execute(query)
122 }
123
124 pub fn by_unique_index(self, index: UniqueIndexHandle, entity: E) -> Result<Response<E>, Error>
130 where
131 E::PrimaryKey: FromKey,
132 {
133 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
134 let index = index.index();
135 let mut lookup = entity;
136 sanitize(&mut lookup);
137
138 let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
139 set_rows_from_len(&mut span, 0);
140 return Ok(Response(Vec::new()));
141 };
142
143 let (dk, stored) = self.load_existing(index, pk)?;
144
145 self.db.context::<E>().with_store_mut(|s| {
146 s.remove(&dk);
147 if !E::INDEXES.is_empty() {
148 self.remove_indexes(&stored)?;
149 }
150 Ok::<_, Error>(())
151 })??;
152
153 set_rows_from_len(&mut span, 1);
154 Ok(Response(vec![(dk.key(), stored)]))
155 }
156
157 pub fn one_by_field(
163 self,
164 field: impl AsRef<str>,
165 value: impl FieldValue,
166 ) -> Result<Response<E>, Error> {
167 let query = DeleteQuery::new().one_by_field(field, value);
168 self.execute(query)
169 }
170
171 pub fn many_by_field<I, V>(
173 self,
174 field: impl AsRef<str>,
175 values: I,
176 ) -> Result<Response<E>, Error>
177 where
178 I: IntoIterator<Item = V>,
179 V: FieldValue,
180 {
181 let query = DeleteQuery::new().many_by_field(field, values);
182 self.execute(query)
183 }
184
185 pub fn all(self) -> Result<Response<E>, Error> {
187 self.execute(DeleteQuery::new())
188 }
189
190 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
192 where
193 F: FnOnce(FilterDsl) -> I,
194 I: IntoFilterExpr,
195 {
196 let query = DeleteQuery::new().filter(f);
197 self.execute(query)
198 }
199
200 pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), Error> {
205 self.one(pk)?.require_one()?;
206 Ok(())
207 }
208
209 pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), Error>
210 where
211 I: IntoIterator<Item = V>,
212 V: FieldValue,
213 {
214 self.many(pks)?.require_some()?;
215
216 Ok(())
217 }
218
219 pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), Error>
220 where
221 I: IntoIterator<Item = V>,
222 V: FieldValue,
223 {
224 self.ensure_delete_any_by_pk(values)
225 }
226
227 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, Error> {
232 QueryValidate::<E>::validate(&query)?;
233 Ok(plan_for::<E>(query.filter.as_ref()))
234 }
235
236 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, Error> {
242 QueryValidate::<E>::validate(&query)?;
243 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
244
245 let plan = plan_for::<E>(query.filter.as_ref());
246
247 let limit = query
248 .limit
249 .as_ref()
250 .and_then(|l| l.limit)
251 .map(|l| l as usize);
252
253 let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
254 let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
255
256 let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
257
258 let mut scanned = 0u64;
259 scan_plan::<E, _>(&self.db, plan, |dk, entity| {
260 scanned = scanned.saturating_add(1);
261 if acc.should_stop(dk, entity) {
262 ControlFlow::Break(())
263 } else {
264 ControlFlow::Continue(())
265 }
266 })?;
267
268 metrics::record_rows_scanned_for::<E>(scanned);
269
270 let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
271 self.db.context::<E>().with_store_mut(|s| {
272 for (dk, entity) in acc.matches {
273 s.remove(&dk);
274 if !E::INDEXES.is_empty() {
275 self.remove_indexes(&entity)?;
276 }
277 res.push((dk.key(), entity));
278 }
279 Ok::<_, Error>(())
280 })??;
281
282 set_rows_from_len(&mut span, res.len());
283
284 Ok(Response(res))
285 }
286
287 fn remove_indexes(&self, entity: &E) -> Result<(), Error> {
288 for index in E::INDEXES {
289 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
290 store.with_borrow_mut(|this| {
291 this.remove_index_entry(entity, index);
292 });
293 }
294 Ok(())
295 }
296
297 fn load_existing(
298 &self,
299 index: &'static IndexSpec,
300 pk: E::PrimaryKey,
301 ) -> Result<(DataKey, E), Error> {
302 let data_key = DataKey::new::<E>(pk.into());
303 let bytes = self
304 .db
305 .context::<E>()
306 .with_store(|store| store.get(&data_key))?;
307
308 let Some(bytes) = bytes else {
309 return Err(ExecutorError::IndexCorrupted(
310 E::PATH.to_string(),
311 index.fields.join(", "),
312 1,
313 )
314 .into());
315 };
316
317 let entity = deserialize::<E>(&bytes)?;
318 Ok((data_key, entity))
319 }
320}