1use crate::{
2 IndexSpec, Key,
3 db::{
4 Db,
5 executor::{
6 ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
7 plan::{plan_for, scan_plan, set_rows_from_len},
8 resolve_unique_pk,
9 },
10 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
11 query::{DeleteQuery, QueryPlan, QueryValidate},
12 response::Response,
13 store::DataKey,
14 },
15 deserialize,
16 obs::metrics,
17 runtime_error::RuntimeError,
18 sanitize,
19 traits::{EntityKind, FieldValue, FromKey},
20};
21use std::{marker::PhantomData, ops::ControlFlow};
22
23struct DeleteAccumulator<'f, E> {
28 filter: Option<&'f FilterExpr>,
29 offset: usize,
30 skipped: usize,
31 limit: Option<usize>,
32 matches: Vec<(DataKey, E)>,
33}
34
35impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
36 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
37 Self {
38 filter,
39 offset,
40 skipped: 0,
41 limit,
42 matches: Vec::with_capacity(limit.unwrap_or(0)),
43 }
44 }
45
46 fn limit_reached(&self) -> bool {
47 self.limit.is_some_and(|lim| self.matches.len() >= lim)
48 }
49
50 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
51 if let Some(f) = self.filter
52 && !FilterEvaluator::new(&entity).eval(f)
53 {
54 return false;
55 }
56
57 if self.skipped < self.offset {
58 self.skipped += 1;
59 return false;
60 }
61
62 if self.limit_reached() {
63 return true;
64 }
65
66 self.matches.push((dk, entity));
67 false
68 }
69}
70
71#[derive(Clone, Copy)]
76pub struct DeleteExecutor<E: EntityKind> {
77 db: Db<E::Canister>,
78 debug: bool,
79 _marker: PhantomData<E>,
80}
81
82impl<E: EntityKind> DeleteExecutor<E> {
83 #[must_use]
84 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
85 Self {
86 db,
87 debug,
88 _marker: PhantomData,
89 }
90 }
91
92 #[must_use]
93 pub const fn debug(mut self) -> Self {
94 self.debug = true;
95 self
96 }
97
98 pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, RuntimeError> {
104 let query = DeleteQuery::new().one::<E>(pk);
105 self.execute(query)
106 }
107
108 pub fn only(self) -> Result<Response<E>, RuntimeError> {
110 let query = DeleteQuery::new().one::<E>(());
111 self.execute(query)
112 }
113
114 pub fn many<I, V>(self, values: I) -> Result<Response<E>, RuntimeError>
116 where
117 I: IntoIterator<Item = V>,
118 V: FieldValue,
119 {
120 let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
121
122 self.execute(query)
123 }
124
125 pub fn by_unique_index(
131 self,
132 index: UniqueIndexHandle,
133 entity: E,
134 ) -> Result<Response<E>, RuntimeError>
135 where
136 E::PrimaryKey: FromKey,
137 {
138 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
139 let index = index.index();
140 let mut lookup = entity;
141 sanitize(&mut lookup)?;
142
143 let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
144 set_rows_from_len(&mut span, 0);
145 return Ok(Response(Vec::new()));
146 };
147
148 let (dk, stored) = self.load_existing(index, pk)?;
149
150 self.db.context::<E>().with_store_mut(|s| {
151 let _unit = WriteUnit::new("delete_unique_row");
155 s.remove(&dk);
156 if !E::INDEXES.is_empty() {
157 self.remove_indexes(&stored)?;
158 }
159 Ok::<_, RuntimeError>(())
160 })??;
161
162 set_rows_from_len(&mut span, 1);
163 Ok(Response(vec![(dk.key(), stored)]))
164 }
165
166 pub fn one_by_field(
172 self,
173 field: impl AsRef<str>,
174 value: impl FieldValue,
175 ) -> Result<Response<E>, RuntimeError> {
176 let query = DeleteQuery::new().one_by_field(field, value);
177 self.execute(query)
178 }
179
180 pub fn many_by_field<I, V>(
182 self,
183 field: impl AsRef<str>,
184 values: I,
185 ) -> Result<Response<E>, RuntimeError>
186 where
187 I: IntoIterator<Item = V>,
188 V: FieldValue,
189 {
190 let query = DeleteQuery::new().many_by_field(field, values);
191 self.execute(query)
192 }
193
194 pub fn all(self) -> Result<Response<E>, RuntimeError> {
196 self.execute(DeleteQuery::new())
197 }
198
199 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, RuntimeError>
201 where
202 F: FnOnce(FilterDsl) -> I,
203 I: IntoFilterExpr,
204 {
205 let query = DeleteQuery::new().filter(f);
206 self.execute(query)
207 }
208
209 pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), RuntimeError> {
214 self.one(pk)?.require_one()?;
215 Ok(())
216 }
217
218 pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), RuntimeError>
219 where
220 I: IntoIterator<Item = V>,
221 V: FieldValue,
222 {
223 self.many(pks)?.require_some()?;
224
225 Ok(())
226 }
227
228 pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), RuntimeError>
229 where
230 I: IntoIterator<Item = V>,
231 V: FieldValue,
232 {
233 self.ensure_delete_any_by_pk(values)
234 }
235
236 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, RuntimeError> {
241 QueryValidate::<E>::validate(&query)?;
242
243 Ok(plan_for::<E>(query.filter.as_ref()))
244 }
245
246 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, RuntimeError> {
252 QueryValidate::<E>::validate(&query)?;
253 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
254
255 let plan = plan_for::<E>(query.filter.as_ref());
256
257 let limit = query
258 .limit
259 .as_ref()
260 .and_then(|l| l.limit)
261 .map(|l| l as usize);
262
263 let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
264 let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
265
266 let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
267
268 let mut scanned = 0u64;
269 scan_plan::<E, _>(&self.db, plan, |dk, entity| {
270 scanned = scanned.saturating_add(1);
271 if acc.should_stop(dk, entity) {
272 ControlFlow::Break(())
273 } else {
274 ControlFlow::Continue(())
275 }
276 })?;
277
278 metrics::record_rows_scanned_for::<E>(scanned);
279
280 let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
281 self.db.context::<E>().with_store_mut(|s| {
282 for (dk, entity) in acc.matches {
287 let _unit = WriteUnit::new("delete_row");
288 s.remove(&dk);
289 if !E::INDEXES.is_empty() {
290 self.remove_indexes(&entity)?;
291 }
292 res.push((dk.key(), entity));
293 }
294 Ok::<_, RuntimeError>(())
295 })??;
296
297 set_rows_from_len(&mut span, res.len());
298
299 Ok(Response(res))
300 }
301
302 fn remove_indexes(&self, entity: &E) -> Result<(), RuntimeError> {
303 for index in E::INDEXES {
304 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
305 store.with_borrow_mut(|this| {
306 this.remove_index_entry(entity, index);
307 });
308 }
309 Ok(())
310 }
311
312 fn load_existing(
313 &self,
314 index: &'static IndexSpec,
315 pk: E::PrimaryKey,
316 ) -> Result<(DataKey, E), RuntimeError> {
317 let data_key = DataKey::new::<E>(pk.into());
318 let bytes = self
319 .db
320 .context::<E>()
321 .with_store(|store| store.get(&data_key))?;
322
323 let Some(bytes) = bytes else {
324 return Err(ExecutorError::IndexCorrupted(
325 E::PATH.to_string(),
326 index.fields.join(", "),
327 1,
328 )
329 .into());
330 };
331
332 let entity = deserialize::<E>(&bytes)?;
333 Ok((data_key, entity))
334 }
335}