1use crate::{
2 db::{
3 Db,
4 executor::{
5 ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
6 plan::{plan_for, record_plan_metrics, scan_strict, set_rows_from_len},
7 resolve_unique_pk,
8 },
9 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10 query::{DeleteQuery, QueryPlan, QueryValidate},
11 response::Response,
12 store::{DataKey, IndexRemoveOutcome},
13 },
14 error::{ErrorOrigin, InternalError},
15 obs::sink::{self, ExecKind, MetricsEvent, Span},
16 prelude::*,
17 sanitize::sanitize,
18 serialize::deserialize,
19 traits::{EntityKind, FieldValue, FromKey},
20};
21use std::{marker::PhantomData, ops::ControlFlow};
22
23struct DeleteAccumulator<'f, E> {
28 filter: Option<&'f FilterExpr>,
29 offset: usize,
30 skipped: usize,
31 limit: Option<usize>,
32 matches: Vec<(DataKey, E)>,
33}
34
35impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
36 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
37 Self {
38 filter,
39 offset,
40 skipped: 0,
41 limit,
42 matches: Vec::with_capacity(limit.unwrap_or(0)),
43 }
44 }
45
46 fn limit_reached(&self) -> bool {
47 self.limit.is_some_and(|lim| self.matches.len() >= lim)
48 }
49
50 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
51 if let Some(f) = self.filter
52 && !FilterEvaluator::new(&entity).eval(f)
53 {
54 return false;
55 }
56
57 if self.skipped < self.offset {
58 self.skipped += 1;
59 return false;
60 }
61
62 if self.limit_reached() {
63 return true;
64 }
65
66 self.matches.push((dk, entity));
67 false
68 }
69}
70
71#[derive(Clone, Copy)]
76pub struct DeleteExecutor<E: EntityKind> {
77 db: Db<E::Canister>,
78 debug: bool,
79 _marker: PhantomData<E>,
80}
81
82impl<E: EntityKind> DeleteExecutor<E> {
83 #[must_use]
84 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
85 Self {
86 db,
87 debug,
88 _marker: PhantomData,
89 }
90 }
91
92 #[must_use]
93 pub const fn debug(mut self) -> Self {
94 self.debug = true;
95 self
96 }
97
98 pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, InternalError> {
104 let query = DeleteQuery::new().one::<E>(pk);
105 self.execute(query)
106 }
107
108 pub fn only(self) -> Result<Response<E>, InternalError> {
110 let query = DeleteQuery::new().one::<E>(());
111 self.execute(query)
112 }
113
114 pub fn many<I, V>(self, values: I) -> Result<Response<E>, InternalError>
116 where
117 I: IntoIterator<Item = V>,
118 V: FieldValue,
119 {
120 let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
121 self.execute(query)
122 }
123
124 pub fn by_unique_index(
130 self,
131 index: UniqueIndexHandle,
132 entity: E,
133 ) -> Result<Response<E>, InternalError>
134 where
135 E::PrimaryKey: FromKey,
136 {
137 let mut span = Span::<E>::new(ExecKind::Delete);
138 let index = index.index();
139 let mut lookup = entity;
140 sanitize(&mut lookup)?;
141
142 let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
143 set_rows_from_len(&mut span, 0);
144
145 return Ok(Response(Vec::new()));
146 };
147
148 let (dk, stored) = self.load_existing(pk)?;
149
150 self.db.context::<E>().with_store_mut(|s| {
151 let _unit = WriteUnit::new("delete_unique_row_non_atomic");
154 s.remove(&dk);
155 if !E::INDEXES.is_empty() {
156 self.remove_indexes(&stored)?;
157 }
158
159 Ok::<_, InternalError>(())
160 })??;
161
162 set_rows_from_len(&mut span, 1);
163
164 Ok(Response(vec![(dk.key(), stored)]))
165 }
166
167 pub fn one_by_field(
173 self,
174 field: impl AsRef<str>,
175 value: impl FieldValue,
176 ) -> Result<Response<E>, InternalError> {
177 let query = DeleteQuery::new().one_by_field(field, value);
178 self.execute(query)
179 }
180
181 pub fn many_by_field<I, V>(
183 self,
184 field: impl AsRef<str>,
185 values: I,
186 ) -> Result<Response<E>, InternalError>
187 where
188 I: IntoIterator<Item = V>,
189 V: FieldValue,
190 {
191 let query = DeleteQuery::new().many_by_field(field, values);
192 self.execute(query)
193 }
194
195 pub fn all(self) -> Result<Response<E>, InternalError> {
197 self.execute(DeleteQuery::new())
198 }
199
200 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, InternalError>
202 where
203 F: FnOnce(FilterDsl) -> I,
204 I: IntoFilterExpr,
205 {
206 let query = DeleteQuery::new().filter(f);
207
208 self.execute(query)
209 }
210
211 pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), InternalError> {
216 self.one(pk)?.require_one()?;
217
218 Ok(())
219 }
220
221 pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), InternalError>
222 where
223 I: IntoIterator<Item = V>,
224 V: FieldValue,
225 {
226 self.many(pks)?.require_some()?;
227
228 Ok(())
229 }
230
231 pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), InternalError>
232 where
233 I: IntoIterator<Item = V>,
234 V: FieldValue,
235 {
236 self.ensure_delete_any_by_pk(values)
237 }
238
239 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, InternalError> {
244 QueryValidate::<E>::validate(&query)?;
245 Ok(plan_for::<E>(query.filter.as_ref()))
246 }
247
248 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, InternalError> {
256 QueryValidate::<E>::validate(&query)?;
257 let mut span = Span::<E>::new(ExecKind::Delete);
258
259 let plan = plan_for::<E>(query.filter.as_ref());
260 record_plan_metrics(&plan);
261
262 let limit = query
263 .limit
264 .as_ref()
265 .and_then(|l| l.limit)
266 .map(|l| l as usize);
267
268 let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
269 let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
270
271 let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
272
273 let mut scanned = 0u64;
274 scan_strict::<E, _>(&self.db, plan, |dk, entity| {
275 scanned = scanned.saturating_add(1);
276 if acc.should_stop(dk, entity) {
277 ControlFlow::Break(())
278 } else {
279 ControlFlow::Continue(())
280 }
281 })?;
282
283 sink::record(MetricsEvent::RowsScanned {
285 entity_path: E::PATH,
286 rows_scanned: scanned,
287 });
288
289 let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
290 self.db.context::<E>().with_store_mut(|s| {
291 for (dk, entity) in acc.matches {
293 let _unit = WriteUnit::new("delete_row_non_atomic");
294 s.remove(&dk);
295 if !E::INDEXES.is_empty() {
296 self.remove_indexes(&entity)?;
297 }
298 res.push((dk.key(), entity));
299 }
300
301 Ok::<_, InternalError>(())
302 })??;
303
304 set_rows_from_len(&mut span, res.len());
305
306 Ok(Response(res))
307 }
308
309 fn remove_indexes(&self, entity: &E) -> Result<(), InternalError> {
310 for index in E::INDEXES {
311 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
312 let removed = store.with_borrow_mut(|this| this.remove_index_entry(entity, index));
313 if removed == IndexRemoveOutcome::Removed {
314 sink::record(MetricsEvent::IndexRemove {
315 entity_path: E::PATH,
316 });
317 }
318 }
319
320 Ok(())
321 }
322
323 fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, E), InternalError> {
324 let data_key = DataKey::new::<E>(pk.into());
325 let bytes = self.db.context::<E>().read_strict(&data_key)?;
326 let entity = deserialize::<E>(&bytes).map_err(|_| {
327 ExecutorError::corruption(
328 ErrorOrigin::Serialize,
329 format!("failed to deserialize row: {data_key}"),
330 )
331 })?;
332
333 Ok((data_key, entity))
334 }
335}