Skip to main content

icydb_core/db/executor/
delete.rs

1use crate::{
2    db::{
3        Db,
4        executor::{
5            ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
6            plan::{plan_for, record_plan_metrics, scan_strict, set_rows_from_len},
7            resolve_unique_pk,
8        },
9        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10        query::{DeleteQuery, QueryPlan, QueryValidate},
11        response::Response,
12        store::{DataKey, IndexRemoveOutcome},
13    },
14    error::{ErrorOrigin, InternalError},
15    obs::sink::{self, ExecKind, MetricsEvent, Span},
16    prelude::*,
17    sanitize::sanitize,
18    traits::{EntityKind, FieldValue, FromKey},
19};
20use std::{marker::PhantomData, ops::ControlFlow};
21
22///
23/// DeleteAccumulator
24///
25
26struct DeleteAccumulator<'f, E> {
27    filter: Option<&'f FilterExpr>,
28    offset: usize,
29    skipped: usize,
30    limit: Option<usize>,
31    matches: Vec<(DataKey, E)>,
32}
33
34impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
35    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
36        Self {
37            filter,
38            offset,
39            skipped: 0,
40            limit,
41            matches: Vec::with_capacity(limit.unwrap_or(0)),
42        }
43    }
44
45    fn limit_reached(&self) -> bool {
46        self.limit.is_some_and(|lim| self.matches.len() >= lim)
47    }
48
49    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
50        if let Some(f) = self.filter
51            && !FilterEvaluator::new(&entity).eval(f)
52        {
53            return false;
54        }
55
56        if self.skipped < self.offset {
57            self.skipped += 1;
58            return false;
59        }
60
61        if self.limit_reached() {
62            return true;
63        }
64
65        self.matches.push((dk, entity));
66        false
67    }
68}
69
70///
71/// DeleteExecutor
72///
73
74#[derive(Clone, Copy)]
75pub struct DeleteExecutor<E: EntityKind> {
76    db: Db<E::Canister>,
77    debug: bool,
78    _marker: PhantomData<E>,
79}
80
81impl<E: EntityKind> DeleteExecutor<E> {
82    #[must_use]
83    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
84        Self {
85            db,
86            debug,
87            _marker: PhantomData,
88        }
89    }
90
91    #[must_use]
92    pub const fn debug(mut self) -> Self {
93        self.debug = true;
94        self
95    }
96
97    // ─────────────────────────────────────────────
98    // PK-BASED HELPERS
99    // ─────────────────────────────────────────────
100
101    /// Delete a single row by primary key.
102    pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, InternalError> {
103        let query = DeleteQuery::new().one::<E>(pk);
104        self.execute(query)
105    }
106
107    /// Delete the unit-key row.
108    pub fn only(self) -> Result<Response<E>, InternalError> {
109        let query = DeleteQuery::new().one::<E>(());
110        self.execute(query)
111    }
112
113    /// Delete multiple rows by primary keys.
114    pub fn many<I, V>(self, values: I) -> Result<Response<E>, InternalError>
115    where
116        I: IntoIterator<Item = V>,
117        V: FieldValue,
118    {
119        let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
120        self.execute(query)
121    }
122
123    // ─────────────────────────────────────────────
124    // UNIQUE INDEX DELETE
125    // ─────────────────────────────────────────────
126
127    /// Delete a single row using a unique index handle.
128    pub fn by_unique_index(
129        self,
130        index: UniqueIndexHandle,
131        entity: E,
132    ) -> Result<Response<E>, InternalError>
133    where
134        E::PrimaryKey: FromKey,
135    {
136        let mut span = Span::<E>::new(ExecKind::Delete);
137        let index = index.index();
138        let mut lookup = entity;
139        sanitize(&mut lookup)?;
140
141        let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
142            set_rows_from_len(&mut span, 0);
143
144            return Ok(Response(Vec::new()));
145        };
146
147        let (dk, stored) = self.load_existing(pk)?;
148
149        self.db.context::<E>().with_store_mut(|s| {
150            // Non-atomic delete: data removal happens before index removal.
151            // If index removal fails, orphaned index entries may remain.
152            let _unit = WriteUnit::new("delete_unique_row_non_atomic");
153            let raw = dk.to_raw();
154            s.remove(&raw);
155            if !E::INDEXES.is_empty() {
156                self.remove_indexes(&stored)?;
157            }
158
159            Ok::<_, InternalError>(())
160        })??;
161
162        set_rows_from_len(&mut span, 1);
163
164        Ok(Response(vec![(dk.key(), stored)]))
165    }
166
167    // ─────────────────────────────────────────────
168    // GENERIC FIELD-BASED DELETE
169    // ─────────────────────────────────────────────
170
171    /// Delete a single row by an arbitrary field value.
172    pub fn one_by_field(
173        self,
174        field: impl AsRef<str>,
175        value: impl FieldValue,
176    ) -> Result<Response<E>, InternalError> {
177        let query = DeleteQuery::new().one_by_field(field, value);
178        self.execute(query)
179    }
180
181    /// Delete multiple rows by an arbitrary field.
182    pub fn many_by_field<I, V>(
183        self,
184        field: impl AsRef<str>,
185        values: I,
186    ) -> Result<Response<E>, InternalError>
187    where
188        I: IntoIterator<Item = V>,
189        V: FieldValue,
190    {
191        let query = DeleteQuery::new().many_by_field(field, values);
192        self.execute(query)
193    }
194
195    /// Delete all rows.
196    pub fn all(self) -> Result<Response<E>, InternalError> {
197        self.execute(DeleteQuery::new())
198    }
199
200    /// Apply a filter builder and delete matches.
201    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, InternalError>
202    where
203        F: FnOnce(FilterDsl) -> I,
204        I: IntoFilterExpr,
205    {
206        let query = DeleteQuery::new().filter(f);
207
208        self.execute(query)
209    }
210
211    // ─────────────────────────────────────────────
212    // ENSURE HELPERS
213    // ─────────────────────────────────────────────
214
215    pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), InternalError> {
216        self.one(pk)?.require_one()?;
217
218        Ok(())
219    }
220
221    pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), InternalError>
222    where
223        I: IntoIterator<Item = V>,
224        V: FieldValue,
225    {
226        self.many(pks)?.require_some()?;
227
228        Ok(())
229    }
230
231    pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), InternalError>
232    where
233        I: IntoIterator<Item = V>,
234        V: FieldValue,
235    {
236        self.ensure_delete_any_by_pk(values)
237    }
238
239    // ─────────────────────────────────────────────
240    // EXECUTION
241    // ─────────────────────────────────────────────
242
243    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, InternalError> {
244        QueryValidate::<E>::validate(&query)?;
245        Ok(plan_for::<E>(query.filter.as_ref()))
246    }
247
248    /// Execute a planner-based delete query.
249    ///
250    /// NOTE:
251    /// - Planner-based deletes are strict on row integrity (missing/malformed rows
252    ///   surface corruption).
253    /// - Planner-based deletes DO NOT enforce unique-index invariants.
254    ///   Use `by_unique_index` for strict unique-index semantics.
255    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, InternalError> {
256        QueryValidate::<E>::validate(&query)?;
257        let mut span = Span::<E>::new(ExecKind::Delete);
258
259        let plan = plan_for::<E>(query.filter.as_ref());
260        record_plan_metrics(&plan);
261
262        let limit = query
263            .limit
264            .as_ref()
265            .and_then(|l| l.limit)
266            .map(|l| l as usize);
267
268        let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
269        let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
270
271        let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
272
273        let mut scanned = 0u64;
274        scan_strict::<E, _>(&self.db, plan, |dk, entity| {
275            scanned = scanned.saturating_add(1);
276            if acc.should_stop(dk, entity) {
277                ControlFlow::Break(())
278            } else {
279                ControlFlow::Continue(())
280            }
281        })?;
282
283        // rows_scanned counts evaluated rows, not deleted rows
284        sink::record(MetricsEvent::RowsScanned {
285            entity_path: E::PATH,
286            rows_scanned: scanned,
287        });
288
289        let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
290        self.db.context::<E>().with_store_mut(|s| {
291            // Non-atomic delete loop: partial deletions may persist on failure.
292            for (dk, entity) in acc.matches {
293                let _unit = WriteUnit::new("delete_row_non_atomic");
294                let raw = dk.to_raw();
295                s.remove(&raw);
296                if !E::INDEXES.is_empty() {
297                    self.remove_indexes(&entity)?;
298                }
299                res.push((dk.key(), entity));
300            }
301
302            Ok::<_, InternalError>(())
303        })??;
304
305        set_rows_from_len(&mut span, res.len());
306
307        Ok(Response(res))
308    }
309
310    fn remove_indexes(&self, entity: &E) -> Result<(), InternalError> {
311        for index in E::INDEXES {
312            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
313            let removed = store.with_borrow_mut(|this| {
314                this.remove_index_entry(entity, index).map_err(|err| {
315                    ExecutorError::corruption(
316                        ErrorOrigin::Index,
317                        format!(
318                            "index corrupted: {} ({}) -> {}",
319                            E::PATH,
320                            index.fields.join(", "),
321                            err
322                        ),
323                    )
324                })
325            })?;
326            if removed == IndexRemoveOutcome::Removed {
327                sink::record(MetricsEvent::IndexRemove {
328                    entity_path: E::PATH,
329                });
330            }
331        }
332
333        Ok(())
334    }
335
336    fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, E), InternalError> {
337        let data_key = DataKey::new::<E>(pk.into());
338        let row = self.db.context::<E>().read_strict(&data_key)?;
339        let entity = row.try_decode::<E>().map_err(|err| {
340            ExecutorError::corruption(
341                ErrorOrigin::Serialize,
342                format!("failed to deserialize row: {data_key} ({err})"),
343            )
344        })?;
345
346        Ok((data_key, entity))
347    }
348}