1use crate::{
2 db::{
3 Db,
4 executor::{
5 ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
6 plan::{plan_for, record_plan_metrics, scan_strict, set_rows_from_len},
7 resolve_unique_pk,
8 },
9 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10 query::{DeleteQuery, QueryPlan, QueryValidate},
11 response::Response,
12 store::{DataKey, IndexRemoveOutcome},
13 },
14 error::{ErrorOrigin, InternalError},
15 obs::sink::{self, ExecKind, MetricsEvent, Span},
16 prelude::*,
17 sanitize::sanitize,
18 traits::{EntityKind, FieldValue, FromKey},
19};
20use std::{marker::PhantomData, ops::ControlFlow};
21
22struct DeleteAccumulator<'f, E> {
27 filter: Option<&'f FilterExpr>,
28 offset: usize,
29 skipped: usize,
30 limit: Option<usize>,
31 matches: Vec<(DataKey, E)>,
32}
33
34impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
35 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
36 Self {
37 filter,
38 offset,
39 skipped: 0,
40 limit,
41 matches: Vec::with_capacity(limit.unwrap_or(0)),
42 }
43 }
44
45 fn limit_reached(&self) -> bool {
46 self.limit.is_some_and(|lim| self.matches.len() >= lim)
47 }
48
49 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
50 if let Some(f) = self.filter
51 && !FilterEvaluator::new(&entity).eval(f)
52 {
53 return false;
54 }
55
56 if self.skipped < self.offset {
57 self.skipped += 1;
58 return false;
59 }
60
61 if self.limit_reached() {
62 return true;
63 }
64
65 self.matches.push((dk, entity));
66 false
67 }
68}
69
70#[derive(Clone, Copy)]
75pub struct DeleteExecutor<E: EntityKind> {
76 db: Db<E::Canister>,
77 debug: bool,
78 _marker: PhantomData<E>,
79}
80
81impl<E: EntityKind> DeleteExecutor<E> {
82 #[must_use]
83 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
84 Self {
85 db,
86 debug,
87 _marker: PhantomData,
88 }
89 }
90
91 #[must_use]
92 pub const fn debug(mut self) -> Self {
93 self.debug = true;
94 self
95 }
96
97 pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, InternalError> {
103 let query = DeleteQuery::new().one::<E>(pk);
104 self.execute(query)
105 }
106
107 pub fn only(self) -> Result<Response<E>, InternalError> {
109 let query = DeleteQuery::new().one::<E>(());
110 self.execute(query)
111 }
112
113 pub fn many<I, V>(self, values: I) -> Result<Response<E>, InternalError>
115 where
116 I: IntoIterator<Item = V>,
117 V: FieldValue,
118 {
119 let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
120 self.execute(query)
121 }
122
123 pub fn by_unique_index(
129 self,
130 index: UniqueIndexHandle,
131 entity: E,
132 ) -> Result<Response<E>, InternalError>
133 where
134 E::PrimaryKey: FromKey,
135 {
136 let mut span = Span::<E>::new(ExecKind::Delete);
137 let index = index.index();
138 let mut lookup = entity;
139 sanitize(&mut lookup)?;
140
141 let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
142 set_rows_from_len(&mut span, 0);
143
144 return Ok(Response(Vec::new()));
145 };
146
147 let (dk, stored) = self.load_existing(pk)?;
148
149 self.db.context::<E>().with_store_mut(|s| {
150 let _unit = WriteUnit::new("delete_unique_row_non_atomic");
153 let raw = dk.to_raw();
154 s.remove(&raw);
155 if !E::INDEXES.is_empty() {
156 self.remove_indexes(&stored)?;
157 }
158
159 Ok::<_, InternalError>(())
160 })??;
161
162 set_rows_from_len(&mut span, 1);
163
164 Ok(Response(vec![(dk.key(), stored)]))
165 }
166
167 pub fn one_by_field(
173 self,
174 field: impl AsRef<str>,
175 value: impl FieldValue,
176 ) -> Result<Response<E>, InternalError> {
177 let query = DeleteQuery::new().one_by_field(field, value);
178 self.execute(query)
179 }
180
181 pub fn many_by_field<I, V>(
183 self,
184 field: impl AsRef<str>,
185 values: I,
186 ) -> Result<Response<E>, InternalError>
187 where
188 I: IntoIterator<Item = V>,
189 V: FieldValue,
190 {
191 let query = DeleteQuery::new().many_by_field(field, values);
192 self.execute(query)
193 }
194
195 pub fn all(self) -> Result<Response<E>, InternalError> {
197 self.execute(DeleteQuery::new())
198 }
199
200 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, InternalError>
202 where
203 F: FnOnce(FilterDsl) -> I,
204 I: IntoFilterExpr,
205 {
206 let query = DeleteQuery::new().filter(f);
207
208 self.execute(query)
209 }
210
211 pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), InternalError> {
216 self.one(pk)?.require_one()?;
217
218 Ok(())
219 }
220
221 pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), InternalError>
222 where
223 I: IntoIterator<Item = V>,
224 V: FieldValue,
225 {
226 self.many(pks)?.require_some()?;
227
228 Ok(())
229 }
230
231 pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), InternalError>
232 where
233 I: IntoIterator<Item = V>,
234 V: FieldValue,
235 {
236 self.ensure_delete_any_by_pk(values)
237 }
238
239 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, InternalError> {
244 QueryValidate::<E>::validate(&query)?;
245 Ok(plan_for::<E>(query.filter.as_ref()))
246 }
247
248 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, InternalError> {
256 QueryValidate::<E>::validate(&query)?;
257 let mut span = Span::<E>::new(ExecKind::Delete);
258
259 let plan = plan_for::<E>(query.filter.as_ref());
260 record_plan_metrics(&plan);
261
262 let limit = query
263 .limit
264 .as_ref()
265 .and_then(|l| l.limit)
266 .map(|l| l as usize);
267
268 let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
269 let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
270
271 let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
272
273 let mut scanned = 0u64;
274 scan_strict::<E, _>(&self.db, plan, |dk, entity| {
275 scanned = scanned.saturating_add(1);
276 if acc.should_stop(dk, entity) {
277 ControlFlow::Break(())
278 } else {
279 ControlFlow::Continue(())
280 }
281 })?;
282
283 sink::record(MetricsEvent::RowsScanned {
285 entity_path: E::PATH,
286 rows_scanned: scanned,
287 });
288
289 let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
290 self.db.context::<E>().with_store_mut(|s| {
291 for (dk, entity) in acc.matches {
293 let _unit = WriteUnit::new("delete_row_non_atomic");
294 let raw = dk.to_raw();
295 s.remove(&raw);
296 if !E::INDEXES.is_empty() {
297 self.remove_indexes(&entity)?;
298 }
299 res.push((dk.key(), entity));
300 }
301
302 Ok::<_, InternalError>(())
303 })??;
304
305 set_rows_from_len(&mut span, res.len());
306
307 Ok(Response(res))
308 }
309
310 fn remove_indexes(&self, entity: &E) -> Result<(), InternalError> {
311 for index in E::INDEXES {
312 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
313 let removed = store.with_borrow_mut(|this| {
314 this.remove_index_entry(entity, index).map_err(|err| {
315 ExecutorError::corruption(
316 ErrorOrigin::Index,
317 format!(
318 "index corrupted: {} ({}) -> {}",
319 E::PATH,
320 index.fields.join(", "),
321 err
322 ),
323 )
324 })
325 })?;
326 if removed == IndexRemoveOutcome::Removed {
327 sink::record(MetricsEvent::IndexRemove {
328 entity_path: E::PATH,
329 });
330 }
331 }
332
333 Ok(())
334 }
335
336 fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, E), InternalError> {
337 let data_key = DataKey::new::<E>(pk.into());
338 let row = self.db.context::<E>().read_strict(&data_key)?;
339 let entity = row.try_decode::<E>().map_err(|err| {
340 ExecutorError::corruption(
341 ErrorOrigin::Serialize,
342 format!("failed to deserialize row: {data_key} ({err})"),
343 )
344 })?;
345
346 Ok((data_key, entity))
347 }
348}