icydb_core/db/executor/
delete.rs

1use crate::{
2    IndexSpec, Key,
3    db::{
4        Db,
5        executor::{
6            ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
7            plan::{plan_for, scan_plan, set_rows_from_len},
8            resolve_unique_pk,
9        },
10        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
11        query::{DeleteQuery, QueryPlan, QueryValidate},
12        response::Response,
13        store::DataKey,
14    },
15    deserialize,
16    obs::metrics,
17    runtime_error::RuntimeError,
18    sanitize,
19    traits::{EntityKind, FieldValue, FromKey},
20};
21use std::{marker::PhantomData, ops::ControlFlow};
22
23///
24/// DeleteAccumulator
25///
26
27struct DeleteAccumulator<'f, E> {
28    filter: Option<&'f FilterExpr>,
29    offset: usize,
30    skipped: usize,
31    limit: Option<usize>,
32    matches: Vec<(DataKey, E)>,
33}
34
35impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
36    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
37        Self {
38            filter,
39            offset,
40            skipped: 0,
41            limit,
42            matches: Vec::with_capacity(limit.unwrap_or(0)),
43        }
44    }
45
46    fn limit_reached(&self) -> bool {
47        self.limit.is_some_and(|lim| self.matches.len() >= lim)
48    }
49
50    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
51        if let Some(f) = self.filter
52            && !FilterEvaluator::new(&entity).eval(f)
53        {
54            return false;
55        }
56
57        if self.skipped < self.offset {
58            self.skipped += 1;
59            return false;
60        }
61
62        if self.limit_reached() {
63            return true;
64        }
65
66        self.matches.push((dk, entity));
67        false
68    }
69}
70
71///
72/// DeleteExecutor
73///
74
75#[derive(Clone, Copy)]
76pub struct DeleteExecutor<E: EntityKind> {
77    db: Db<E::Canister>,
78    debug: bool,
79    _marker: PhantomData<E>,
80}
81
82impl<E: EntityKind> DeleteExecutor<E> {
83    #[must_use]
84    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
85        Self {
86            db,
87            debug,
88            _marker: PhantomData,
89        }
90    }
91
92    #[must_use]
93    pub const fn debug(mut self) -> Self {
94        self.debug = true;
95        self
96    }
97
98    // ─────────────────────────────────────────────
99    // PK-BASED HELPERS
100    // ─────────────────────────────────────────────
101
102    /// Delete a single row by primary key.
103    pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, RuntimeError> {
104        let query = DeleteQuery::new().one::<E>(pk);
105        self.execute(query)
106    }
107
108    /// Delete the unit-key row.
109    pub fn only(self) -> Result<Response<E>, RuntimeError> {
110        let query = DeleteQuery::new().one::<E>(());
111        self.execute(query)
112    }
113
114    /// Delete multiple rows by primary keys.
115    pub fn many<I, V>(self, values: I) -> Result<Response<E>, RuntimeError>
116    where
117        I: IntoIterator<Item = V>,
118        V: FieldValue,
119    {
120        let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
121
122        self.execute(query)
123    }
124
125    // ─────────────────────────────────────────────
126    // UNIQUE INDEX DELETE
127    // ─────────────────────────────────────────────
128
129    /// Delete a single row using a unique index handle.
130    pub fn by_unique_index(
131        self,
132        index: UniqueIndexHandle,
133        entity: E,
134    ) -> Result<Response<E>, RuntimeError>
135    where
136        E::PrimaryKey: FromKey,
137    {
138        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
139        let index = index.index();
140        let mut lookup = entity;
141        sanitize(&mut lookup)?;
142
143        let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
144            set_rows_from_len(&mut span, 0);
145            return Ok(Response(Vec::new()));
146        };
147
148        let (dk, stored) = self.load_existing(index, pk)?;
149
150        self.db.context::<E>().with_store_mut(|s| {
151            // Delete semantics: data removal happens before index removal.
152            // Corruption risk: if index removal fails, orphaned index entries remain.
153            // Retry-safe only if missing rows are acceptable to the caller.
154            let _unit = WriteUnit::new("delete_unique_row");
155            s.remove(&dk);
156            if !E::INDEXES.is_empty() {
157                self.remove_indexes(&stored)?;
158            }
159            Ok::<_, RuntimeError>(())
160        })??;
161
162        set_rows_from_len(&mut span, 1);
163        Ok(Response(vec![(dk.key(), stored)]))
164    }
165
166    // ─────────────────────────────────────────────
167    // GENERIC FIELD-BASED DELETE
168    // ─────────────────────────────────────────────
169
170    /// Delete a single row by an arbitrary field value.
171    pub fn one_by_field(
172        self,
173        field: impl AsRef<str>,
174        value: impl FieldValue,
175    ) -> Result<Response<E>, RuntimeError> {
176        let query = DeleteQuery::new().one_by_field(field, value);
177        self.execute(query)
178    }
179
180    /// Delete multiple rows by an arbitrary field.
181    pub fn many_by_field<I, V>(
182        self,
183        field: impl AsRef<str>,
184        values: I,
185    ) -> Result<Response<E>, RuntimeError>
186    where
187        I: IntoIterator<Item = V>,
188        V: FieldValue,
189    {
190        let query = DeleteQuery::new().many_by_field(field, values);
191        self.execute(query)
192    }
193
194    /// Delete all rows.
195    pub fn all(self) -> Result<Response<E>, RuntimeError> {
196        self.execute(DeleteQuery::new())
197    }
198
199    /// Apply a filter builder and delete matches.
200    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, RuntimeError>
201    where
202        F: FnOnce(FilterDsl) -> I,
203        I: IntoFilterExpr,
204    {
205        let query = DeleteQuery::new().filter(f);
206        self.execute(query)
207    }
208
209    // ─────────────────────────────────────────────
210    // ENSURE HELPERS
211    // ─────────────────────────────────────────────
212
213    pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), RuntimeError> {
214        self.one(pk)?.require_one()?;
215        Ok(())
216    }
217
218    pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), RuntimeError>
219    where
220        I: IntoIterator<Item = V>,
221        V: FieldValue,
222    {
223        self.many(pks)?.require_some()?;
224
225        Ok(())
226    }
227
228    pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), RuntimeError>
229    where
230        I: IntoIterator<Item = V>,
231        V: FieldValue,
232    {
233        self.ensure_delete_any_by_pk(values)
234    }
235
236    // ─────────────────────────────────────────────
237    // EXECUTION
238    // ─────────────────────────────────────────────
239
240    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, RuntimeError> {
241        QueryValidate::<E>::validate(&query)?;
242
243        Ok(plan_for::<E>(query.filter.as_ref()))
244    }
245
246    /// Execute a planner-based delete query.
247    ///
248    /// Note: index-planned deletes are best-effort and do not validate unique-index
249    /// invariants. Corrupt index entries may be skipped. Use `by_unique_index` for
250    /// strict unique-index semantics.
251    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, RuntimeError> {
252        QueryValidate::<E>::validate(&query)?;
253        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
254
255        let plan = plan_for::<E>(query.filter.as_ref());
256
257        let limit = query
258            .limit
259            .as_ref()
260            .and_then(|l| l.limit)
261            .map(|l| l as usize);
262
263        let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
264        let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
265
266        let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
267
268        let mut scanned = 0u64;
269        scan_plan::<E, _>(&self.db, plan, |dk, entity| {
270            scanned = scanned.saturating_add(1);
271            if acc.should_stop(dk, entity) {
272                ControlFlow::Break(())
273            } else {
274                ControlFlow::Continue(())
275            }
276        })?;
277
278        metrics::record_rows_scanned_for::<E>(scanned);
279
280        let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
281        self.db.context::<E>().with_store_mut(|s| {
282            // Delete semantics: incremental and non-atomic across data/index stores.
283            // Acceptable: partial deletions remain if an error occurs mid-loop.
284            // Corruption risk: if index removal fails after data removal, indexes
285            // can retain orphaned entries. Retry-safe only if missing rows are ok.
286            for (dk, entity) in acc.matches {
287                let _unit = WriteUnit::new("delete_row");
288                s.remove(&dk);
289                if !E::INDEXES.is_empty() {
290                    self.remove_indexes(&entity)?;
291                }
292                res.push((dk.key(), entity));
293            }
294            Ok::<_, RuntimeError>(())
295        })??;
296
297        set_rows_from_len(&mut span, res.len());
298
299        Ok(Response(res))
300    }
301
302    fn remove_indexes(&self, entity: &E) -> Result<(), RuntimeError> {
303        for index in E::INDEXES {
304            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
305            store.with_borrow_mut(|this| {
306                this.remove_index_entry(entity, index);
307            });
308        }
309        Ok(())
310    }
311
312    fn load_existing(
313        &self,
314        index: &'static IndexSpec,
315        pk: E::PrimaryKey,
316    ) -> Result<(DataKey, E), RuntimeError> {
317        let data_key = DataKey::new::<E>(pk.into());
318        let bytes = self
319            .db
320            .context::<E>()
321            .with_store(|store| store.get(&data_key))?;
322
323        let Some(bytes) = bytes else {
324            return Err(ExecutorError::IndexCorrupted(
325                E::PATH.to_string(),
326                index.fields.join(", "),
327                1,
328            )
329            .into());
330        };
331
332        let entity = deserialize::<E>(&bytes)?;
333        Ok((data_key, entity))
334    }
335}