icydb_core/db/executor/
delete.rs

1use crate::{
2    db::{
3        Db,
4        executor::{
5            ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
6            plan::{plan_for, record_plan_metrics, scan_strict, set_rows_from_len},
7            resolve_unique_pk,
8        },
9        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10        query::{DeleteQuery, QueryPlan, QueryValidate},
11        response::Response,
12        store::{DataKey, IndexRemoveOutcome},
13    },
14    error::{ErrorOrigin, InternalError},
15    obs::sink::{self, ExecKind, MetricsEvent, Span},
16    prelude::*,
17    sanitize::sanitize,
18    serialize::deserialize,
19    traits::{EntityKind, FieldValue, FromKey},
20};
21use std::{marker::PhantomData, ops::ControlFlow};
22
23///
24/// DeleteAccumulator
25///
26
27struct DeleteAccumulator<'f, E> {
28    filter: Option<&'f FilterExpr>,
29    offset: usize,
30    skipped: usize,
31    limit: Option<usize>,
32    matches: Vec<(DataKey, E)>,
33}
34
35impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
36    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
37        Self {
38            filter,
39            offset,
40            skipped: 0,
41            limit,
42            matches: Vec::with_capacity(limit.unwrap_or(0)),
43        }
44    }
45
46    fn limit_reached(&self) -> bool {
47        self.limit.is_some_and(|lim| self.matches.len() >= lim)
48    }
49
50    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
51        if let Some(f) = self.filter
52            && !FilterEvaluator::new(&entity).eval(f)
53        {
54            return false;
55        }
56
57        if self.skipped < self.offset {
58            self.skipped += 1;
59            return false;
60        }
61
62        if self.limit_reached() {
63            return true;
64        }
65
66        self.matches.push((dk, entity));
67        false
68    }
69}
70
71///
72/// DeleteExecutor
73///
74
75#[derive(Clone, Copy)]
76pub struct DeleteExecutor<E: EntityKind> {
77    db: Db<E::Canister>,
78    debug: bool,
79    _marker: PhantomData<E>,
80}
81
82impl<E: EntityKind> DeleteExecutor<E> {
83    #[must_use]
84    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
85        Self {
86            db,
87            debug,
88            _marker: PhantomData,
89        }
90    }
91
92    #[must_use]
93    pub const fn debug(mut self) -> Self {
94        self.debug = true;
95        self
96    }
97
98    // ─────────────────────────────────────────────
99    // PK-BASED HELPERS
100    // ─────────────────────────────────────────────
101
102    /// Delete a single row by primary key.
103    pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, InternalError> {
104        let query = DeleteQuery::new().one::<E>(pk);
105        self.execute(query)
106    }
107
108    /// Delete the unit-key row.
109    pub fn only(self) -> Result<Response<E>, InternalError> {
110        let query = DeleteQuery::new().one::<E>(());
111        self.execute(query)
112    }
113
114    /// Delete multiple rows by primary keys.
115    pub fn many<I, V>(self, values: I) -> Result<Response<E>, InternalError>
116    where
117        I: IntoIterator<Item = V>,
118        V: FieldValue,
119    {
120        let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
121        self.execute(query)
122    }
123
124    // ─────────────────────────────────────────────
125    // UNIQUE INDEX DELETE
126    // ─────────────────────────────────────────────
127
128    /// Delete a single row using a unique index handle.
129    pub fn by_unique_index(
130        self,
131        index: UniqueIndexHandle,
132        entity: E,
133    ) -> Result<Response<E>, InternalError>
134    where
135        E::PrimaryKey: FromKey,
136    {
137        let mut span = Span::<E>::new(ExecKind::Delete);
138        let index = index.index();
139        let mut lookup = entity;
140        sanitize(&mut lookup)?;
141
142        let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
143            set_rows_from_len(&mut span, 0);
144
145            return Ok(Response(Vec::new()));
146        };
147
148        let (dk, stored) = self.load_existing(pk)?;
149
150        self.db.context::<E>().with_store_mut(|s| {
151            // Non-atomic delete: data removal happens before index removal.
152            // If index removal fails, orphaned index entries may remain.
153            let _unit = WriteUnit::new("delete_unique_row_non_atomic");
154            s.remove(&dk);
155            if !E::INDEXES.is_empty() {
156                self.remove_indexes(&stored)?;
157            }
158
159            Ok::<_, InternalError>(())
160        })??;
161
162        set_rows_from_len(&mut span, 1);
163
164        Ok(Response(vec![(dk.key(), stored)]))
165    }
166
167    // ─────────────────────────────────────────────
168    // GENERIC FIELD-BASED DELETE
169    // ─────────────────────────────────────────────
170
171    /// Delete a single row by an arbitrary field value.
172    pub fn one_by_field(
173        self,
174        field: impl AsRef<str>,
175        value: impl FieldValue,
176    ) -> Result<Response<E>, InternalError> {
177        let query = DeleteQuery::new().one_by_field(field, value);
178        self.execute(query)
179    }
180
181    /// Delete multiple rows by an arbitrary field.
182    pub fn many_by_field<I, V>(
183        self,
184        field: impl AsRef<str>,
185        values: I,
186    ) -> Result<Response<E>, InternalError>
187    where
188        I: IntoIterator<Item = V>,
189        V: FieldValue,
190    {
191        let query = DeleteQuery::new().many_by_field(field, values);
192        self.execute(query)
193    }
194
195    /// Delete all rows.
196    pub fn all(self) -> Result<Response<E>, InternalError> {
197        self.execute(DeleteQuery::new())
198    }
199
200    /// Apply a filter builder and delete matches.
201    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, InternalError>
202    where
203        F: FnOnce(FilterDsl) -> I,
204        I: IntoFilterExpr,
205    {
206        let query = DeleteQuery::new().filter(f);
207
208        self.execute(query)
209    }
210
211    // ─────────────────────────────────────────────
212    // ENSURE HELPERS
213    // ─────────────────────────────────────────────
214
215    pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), InternalError> {
216        self.one(pk)?.require_one()?;
217
218        Ok(())
219    }
220
221    pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), InternalError>
222    where
223        I: IntoIterator<Item = V>,
224        V: FieldValue,
225    {
226        self.many(pks)?.require_some()?;
227
228        Ok(())
229    }
230
231    pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), InternalError>
232    where
233        I: IntoIterator<Item = V>,
234        V: FieldValue,
235    {
236        self.ensure_delete_any_by_pk(values)
237    }
238
239    // ─────────────────────────────────────────────
240    // EXECUTION
241    // ─────────────────────────────────────────────
242
243    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, InternalError> {
244        QueryValidate::<E>::validate(&query)?;
245        Ok(plan_for::<E>(query.filter.as_ref()))
246    }
247
248    /// Execute a planner-based delete query.
249    ///
250    /// NOTE:
251    /// - Planner-based deletes are strict on row integrity (missing/malformed rows
252    ///   surface corruption).
253    /// - Planner-based deletes DO NOT enforce unique-index invariants.
254    ///   Use `by_unique_index` for strict unique-index semantics.
255    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, InternalError> {
256        QueryValidate::<E>::validate(&query)?;
257        let mut span = Span::<E>::new(ExecKind::Delete);
258
259        let plan = plan_for::<E>(query.filter.as_ref());
260        record_plan_metrics(&plan);
261
262        let limit = query
263            .limit
264            .as_ref()
265            .and_then(|l| l.limit)
266            .map(|l| l as usize);
267
268        let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
269        let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
270
271        let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
272
273        let mut scanned = 0u64;
274        scan_strict::<E, _>(&self.db, plan, |dk, entity| {
275            scanned = scanned.saturating_add(1);
276            if acc.should_stop(dk, entity) {
277                ControlFlow::Break(())
278            } else {
279                ControlFlow::Continue(())
280            }
281        })?;
282
283        // rows_scanned counts evaluated rows, not deleted rows
284        sink::record(MetricsEvent::RowsScanned {
285            entity_path: E::PATH,
286            rows_scanned: scanned,
287        });
288
289        let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
290        self.db.context::<E>().with_store_mut(|s| {
291            // Non-atomic delete loop: partial deletions may persist on failure.
292            for (dk, entity) in acc.matches {
293                let _unit = WriteUnit::new("delete_row_non_atomic");
294                s.remove(&dk);
295                if !E::INDEXES.is_empty() {
296                    self.remove_indexes(&entity)?;
297                }
298                res.push((dk.key(), entity));
299            }
300
301            Ok::<_, InternalError>(())
302        })??;
303
304        set_rows_from_len(&mut span, res.len());
305
306        Ok(Response(res))
307    }
308
309    fn remove_indexes(&self, entity: &E) -> Result<(), InternalError> {
310        for index in E::INDEXES {
311            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
312            let removed = store.with_borrow_mut(|this| this.remove_index_entry(entity, index));
313            if removed == IndexRemoveOutcome::Removed {
314                sink::record(MetricsEvent::IndexRemove {
315                    entity_path: E::PATH,
316                });
317            }
318        }
319
320        Ok(())
321    }
322
323    fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, E), InternalError> {
324        let data_key = DataKey::new::<E>(pk.into());
325        let bytes = self.db.context::<E>().read_strict(&data_key)?;
326        let entity = deserialize::<E>(&bytes).map_err(|_| {
327            ExecutorError::corruption(
328                ErrorOrigin::Serialize,
329                format!("failed to deserialize row: {data_key}"),
330            )
331        })?;
332
333        Ok((data_key, entity))
334    }
335}