1use crate::{
2 db::{
3 Db,
4 executor::{
5 ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
6 plan::{plan_for, scan_plan, set_rows_from_len},
7 resolve_unique_pk,
8 },
9 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10 query::{DeleteQuery, QueryPlan, QueryValidate},
11 response::Response,
12 store::DataKey,
13 },
14 error::InternalError,
15 obs::sink::{self, ExecKind, MetricsEvent, Span},
16 prelude::*,
17 sanitize::sanitize,
18 serialize::deserialize,
19 traits::{EntityKind, FieldValue, FromKey},
20};
21use std::{marker::PhantomData, ops::ControlFlow};
22
23struct DeleteAccumulator<'f, E> {
28 filter: Option<&'f FilterExpr>,
29 offset: usize,
30 skipped: usize,
31 limit: Option<usize>,
32 matches: Vec<(DataKey, E)>,
33}
34
35impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
36 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
37 Self {
38 filter,
39 offset,
40 skipped: 0,
41 limit,
42 matches: Vec::with_capacity(limit.unwrap_or(0)),
43 }
44 }
45
46 fn limit_reached(&self) -> bool {
47 self.limit.is_some_and(|lim| self.matches.len() >= lim)
48 }
49
50 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
51 if let Some(f) = self.filter
52 && !FilterEvaluator::new(&entity).eval(f)
53 {
54 return false;
55 }
56
57 if self.skipped < self.offset {
58 self.skipped += 1;
59 return false;
60 }
61
62 if self.limit_reached() {
63 return true;
64 }
65
66 self.matches.push((dk, entity));
67 false
68 }
69}
70
71#[derive(Clone, Copy)]
76pub struct DeleteExecutor<E: EntityKind> {
77 db: Db<E::Canister>,
78 debug: bool,
79 _marker: PhantomData<E>,
80}
81
82impl<E: EntityKind> DeleteExecutor<E> {
83 #[must_use]
84 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
85 Self {
86 db,
87 debug,
88 _marker: PhantomData,
89 }
90 }
91
92 #[must_use]
93 pub const fn debug(mut self) -> Self {
94 self.debug = true;
95 self
96 }
97
98 pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, InternalError> {
104 let query = DeleteQuery::new().one::<E>(pk);
105 self.execute(query)
106 }
107
108 pub fn only(self) -> Result<Response<E>, InternalError> {
110 let query = DeleteQuery::new().one::<E>(());
111 self.execute(query)
112 }
113
114 pub fn many<I, V>(self, values: I) -> Result<Response<E>, InternalError>
116 where
117 I: IntoIterator<Item = V>,
118 V: FieldValue,
119 {
120 let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
121
122 self.execute(query)
123 }
124
125 pub fn by_unique_index(
131 self,
132 index: UniqueIndexHandle,
133 entity: E,
134 ) -> Result<Response<E>, InternalError>
135 where
136 E::PrimaryKey: FromKey,
137 {
138 let mut span = Span::<E>::new(ExecKind::Delete);
139 let index = index.index();
140 let mut lookup = entity;
141 sanitize(&mut lookup)?;
142
143 let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
144 set_rows_from_len(&mut span, 0);
145 return Ok(Response(Vec::new()));
146 };
147
148 let (dk, stored) = self.load_existing(index, pk)?;
149
150 self.db.context::<E>().with_store_mut(|s| {
151 let _unit = WriteUnit::new("delete_unique_row");
155 s.remove(&dk);
156 if !E::INDEXES.is_empty() {
157 self.remove_indexes(&stored)?;
158 }
159 Ok::<_, InternalError>(())
160 })??;
161
162 set_rows_from_len(&mut span, 1);
163 Ok(Response(vec![(dk.key(), stored)]))
164 }
165
166 pub fn one_by_field(
172 self,
173 field: impl AsRef<str>,
174 value: impl FieldValue,
175 ) -> Result<Response<E>, InternalError> {
176 let query = DeleteQuery::new().one_by_field(field, value);
177 self.execute(query)
178 }
179
180 pub fn many_by_field<I, V>(
182 self,
183 field: impl AsRef<str>,
184 values: I,
185 ) -> Result<Response<E>, InternalError>
186 where
187 I: IntoIterator<Item = V>,
188 V: FieldValue,
189 {
190 let query = DeleteQuery::new().many_by_field(field, values);
191 self.execute(query)
192 }
193
194 pub fn all(self) -> Result<Response<E>, InternalError> {
196 self.execute(DeleteQuery::new())
197 }
198
199 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, InternalError>
201 where
202 F: FnOnce(FilterDsl) -> I,
203 I: IntoFilterExpr,
204 {
205 let query = DeleteQuery::new().filter(f);
206 self.execute(query)
207 }
208
209 pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), InternalError> {
214 self.one(pk)?.require_one()?;
215 Ok(())
216 }
217
218 pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), InternalError>
219 where
220 I: IntoIterator<Item = V>,
221 V: FieldValue,
222 {
223 self.many(pks)?.require_some()?;
224
225 Ok(())
226 }
227
228 pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), InternalError>
229 where
230 I: IntoIterator<Item = V>,
231 V: FieldValue,
232 {
233 self.ensure_delete_any_by_pk(values)
234 }
235
236 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, InternalError> {
241 QueryValidate::<E>::validate(&query)?;
242
243 Ok(plan_for::<E>(query.filter.as_ref()))
244 }
245
246 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, InternalError> {
252 QueryValidate::<E>::validate(&query)?;
253 let mut span = Span::<E>::new(ExecKind::Delete);
254
255 let plan = plan_for::<E>(query.filter.as_ref());
256
257 let limit = query
258 .limit
259 .as_ref()
260 .and_then(|l| l.limit)
261 .map(|l| l as usize);
262
263 let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
264 let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
265
266 let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
267
268 let mut scanned = 0u64;
269 scan_plan::<E, _>(&self.db, plan, |dk, entity| {
270 scanned = scanned.saturating_add(1);
271 if acc.should_stop(dk, entity) {
272 ControlFlow::Break(())
273 } else {
274 ControlFlow::Continue(())
275 }
276 })?;
277
278 sink::record(MetricsEvent::RowsScanned {
279 entity_path: E::PATH,
280 rows_scanned: scanned,
281 });
282
283 let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
284 self.db.context::<E>().with_store_mut(|s| {
285 for (dk, entity) in acc.matches {
290 let _unit = WriteUnit::new("delete_row");
291 s.remove(&dk);
292 if !E::INDEXES.is_empty() {
293 self.remove_indexes(&entity)?;
294 }
295 res.push((dk.key(), entity));
296 }
297 Ok::<_, InternalError>(())
298 })??;
299
300 set_rows_from_len(&mut span, res.len());
301
302 Ok(Response(res))
303 }
304
305 fn remove_indexes(&self, entity: &E) -> Result<(), InternalError> {
306 for index in E::INDEXES {
307 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
308 store.with_borrow_mut(|this| {
309 this.remove_index_entry(entity, index);
310 });
311 }
312 Ok(())
313 }
314
315 fn load_existing(
316 &self,
317 index: &'static IndexModel,
318 pk: E::PrimaryKey,
319 ) -> Result<(DataKey, E), InternalError> {
320 let data_key = DataKey::new::<E>(pk.into());
321 let bytes = self
322 .db
323 .context::<E>()
324 .with_store(|store| store.get(&data_key))?;
325
326 let Some(bytes) = bytes else {
327 return Err(ExecutorError::IndexCorrupted(
328 E::PATH.to_string(),
329 index.fields.join(", "),
330 1,
331 )
332 .into());
333 };
334
335 let entity = deserialize::<E>(&bytes)?;
336 Ok((data_key, entity))
337 }
338}