icydb_core/db/executor/
delete.rs

1use crate::{
2    db::{
3        Db,
4        executor::{
5            ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
6            plan::{plan_for, scan_plan, set_rows_from_len},
7            resolve_unique_pk,
8        },
9        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10        query::{DeleteQuery, QueryPlan, QueryValidate},
11        response::Response,
12        store::DataKey,
13    },
14    error::InternalError,
15    obs::sink::{self, ExecKind, MetricsEvent, Span},
16    prelude::*,
17    sanitize::sanitize,
18    serialize::deserialize,
19    traits::{EntityKind, FieldValue, FromKey},
20};
21use std::{marker::PhantomData, ops::ControlFlow};
22
23///
24/// DeleteAccumulator
25///
26
27struct DeleteAccumulator<'f, E> {
28    filter: Option<&'f FilterExpr>,
29    offset: usize,
30    skipped: usize,
31    limit: Option<usize>,
32    matches: Vec<(DataKey, E)>,
33}
34
35impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
36    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
37        Self {
38            filter,
39            offset,
40            skipped: 0,
41            limit,
42            matches: Vec::with_capacity(limit.unwrap_or(0)),
43        }
44    }
45
46    fn limit_reached(&self) -> bool {
47        self.limit.is_some_and(|lim| self.matches.len() >= lim)
48    }
49
50    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
51        if let Some(f) = self.filter
52            && !FilterEvaluator::new(&entity).eval(f)
53        {
54            return false;
55        }
56
57        if self.skipped < self.offset {
58            self.skipped += 1;
59            return false;
60        }
61
62        if self.limit_reached() {
63            return true;
64        }
65
66        self.matches.push((dk, entity));
67        false
68    }
69}
70
71///
72/// DeleteExecutor
73///
74
75#[derive(Clone, Copy)]
76pub struct DeleteExecutor<E: EntityKind> {
77    db: Db<E::Canister>,
78    debug: bool,
79    _marker: PhantomData<E>,
80}
81
82impl<E: EntityKind> DeleteExecutor<E> {
83    #[must_use]
84    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
85        Self {
86            db,
87            debug,
88            _marker: PhantomData,
89        }
90    }
91
92    #[must_use]
93    pub const fn debug(mut self) -> Self {
94        self.debug = true;
95        self
96    }
97
98    // ─────────────────────────────────────────────
99    // PK-BASED HELPERS
100    // ─────────────────────────────────────────────
101
102    /// Delete a single row by primary key.
103    pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, InternalError> {
104        let query = DeleteQuery::new().one::<E>(pk);
105        self.execute(query)
106    }
107
108    /// Delete the unit-key row.
109    pub fn only(self) -> Result<Response<E>, InternalError> {
110        let query = DeleteQuery::new().one::<E>(());
111        self.execute(query)
112    }
113
114    /// Delete multiple rows by primary keys.
115    pub fn many<I, V>(self, values: I) -> Result<Response<E>, InternalError>
116    where
117        I: IntoIterator<Item = V>,
118        V: FieldValue,
119    {
120        let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
121
122        self.execute(query)
123    }
124
125    // ─────────────────────────────────────────────
126    // UNIQUE INDEX DELETE
127    // ─────────────────────────────────────────────
128
129    /// Delete a single row using a unique index handle.
130    pub fn by_unique_index(
131        self,
132        index: UniqueIndexHandle,
133        entity: E,
134    ) -> Result<Response<E>, InternalError>
135    where
136        E::PrimaryKey: FromKey,
137    {
138        let mut span = Span::<E>::new(ExecKind::Delete);
139        let index = index.index();
140        let mut lookup = entity;
141        sanitize(&mut lookup)?;
142
143        let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
144            set_rows_from_len(&mut span, 0);
145            return Ok(Response(Vec::new()));
146        };
147
148        let (dk, stored) = self.load_existing(index, pk)?;
149
150        self.db.context::<E>().with_store_mut(|s| {
151            // Delete semantics: data removal happens before index removal.
152            // Corruption risk: if index removal fails, orphaned index entries remain.
153            // Retry-safe only if missing rows are acceptable to the caller.
154            let _unit = WriteUnit::new("delete_unique_row");
155            s.remove(&dk);
156            if !E::INDEXES.is_empty() {
157                self.remove_indexes(&stored)?;
158            }
159            Ok::<_, InternalError>(())
160        })??;
161
162        set_rows_from_len(&mut span, 1);
163        Ok(Response(vec![(dk.key(), stored)]))
164    }
165
166    // ─────────────────────────────────────────────
167    // GENERIC FIELD-BASED DELETE
168    // ─────────────────────────────────────────────
169
170    /// Delete a single row by an arbitrary field value.
171    pub fn one_by_field(
172        self,
173        field: impl AsRef<str>,
174        value: impl FieldValue,
175    ) -> Result<Response<E>, InternalError> {
176        let query = DeleteQuery::new().one_by_field(field, value);
177        self.execute(query)
178    }
179
180    /// Delete multiple rows by an arbitrary field.
181    pub fn many_by_field<I, V>(
182        self,
183        field: impl AsRef<str>,
184        values: I,
185    ) -> Result<Response<E>, InternalError>
186    where
187        I: IntoIterator<Item = V>,
188        V: FieldValue,
189    {
190        let query = DeleteQuery::new().many_by_field(field, values);
191        self.execute(query)
192    }
193
194    /// Delete all rows.
195    pub fn all(self) -> Result<Response<E>, InternalError> {
196        self.execute(DeleteQuery::new())
197    }
198
199    /// Apply a filter builder and delete matches.
200    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, InternalError>
201    where
202        F: FnOnce(FilterDsl) -> I,
203        I: IntoFilterExpr,
204    {
205        let query = DeleteQuery::new().filter(f);
206        self.execute(query)
207    }
208
209    // ─────────────────────────────────────────────
210    // ENSURE HELPERS
211    // ─────────────────────────────────────────────
212
213    pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), InternalError> {
214        self.one(pk)?.require_one()?;
215        Ok(())
216    }
217
218    pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), InternalError>
219    where
220        I: IntoIterator<Item = V>,
221        V: FieldValue,
222    {
223        self.many(pks)?.require_some()?;
224
225        Ok(())
226    }
227
228    pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), InternalError>
229    where
230        I: IntoIterator<Item = V>,
231        V: FieldValue,
232    {
233        self.ensure_delete_any_by_pk(values)
234    }
235
236    // ─────────────────────────────────────────────
237    // EXECUTION
238    // ─────────────────────────────────────────────
239
240    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, InternalError> {
241        QueryValidate::<E>::validate(&query)?;
242
243        Ok(plan_for::<E>(query.filter.as_ref()))
244    }
245
246    /// Execute a planner-based delete query.
247    ///
248    /// Note: index-planned deletes are best-effort and do not validate unique-index
249    /// invariants. Corrupt index entries may be skipped. Use `by_unique_index` for
250    /// strict unique-index semantics.
251    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, InternalError> {
252        QueryValidate::<E>::validate(&query)?;
253        let mut span = Span::<E>::new(ExecKind::Delete);
254
255        let plan = plan_for::<E>(query.filter.as_ref());
256
257        let limit = query
258            .limit
259            .as_ref()
260            .and_then(|l| l.limit)
261            .map(|l| l as usize);
262
263        let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
264        let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
265
266        let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
267
268        let mut scanned = 0u64;
269        scan_plan::<E, _>(&self.db, plan, |dk, entity| {
270            scanned = scanned.saturating_add(1);
271            if acc.should_stop(dk, entity) {
272                ControlFlow::Break(())
273            } else {
274                ControlFlow::Continue(())
275            }
276        })?;
277
278        sink::record(MetricsEvent::RowsScanned {
279            entity_path: E::PATH,
280            rows_scanned: scanned,
281        });
282
283        let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
284        self.db.context::<E>().with_store_mut(|s| {
285            // Delete semantics: incremental and non-atomic across data/index stores.
286            // Acceptable: partial deletions remain if an error occurs mid-loop.
287            // Corruption risk: if index removal fails after data removal, indexes
288            // can retain orphaned entries. Retry-safe only if missing rows are ok.
289            for (dk, entity) in acc.matches {
290                let _unit = WriteUnit::new("delete_row");
291                s.remove(&dk);
292                if !E::INDEXES.is_empty() {
293                    self.remove_indexes(&entity)?;
294                }
295                res.push((dk.key(), entity));
296            }
297            Ok::<_, InternalError>(())
298        })??;
299
300        set_rows_from_len(&mut span, res.len());
301
302        Ok(Response(res))
303    }
304
305    fn remove_indexes(&self, entity: &E) -> Result<(), InternalError> {
306        for index in E::INDEXES {
307            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
308            store.with_borrow_mut(|this| {
309                this.remove_index_entry(entity, index);
310            });
311        }
312        Ok(())
313    }
314
315    fn load_existing(
316        &self,
317        index: &'static IndexModel,
318        pk: E::PrimaryKey,
319    ) -> Result<(DataKey, E), InternalError> {
320        let data_key = DataKey::new::<E>(pk.into());
321        let bytes = self
322            .db
323            .context::<E>()
324            .with_store(|store| store.get(&data_key))?;
325
326        let Some(bytes) = bytes else {
327            return Err(ExecutorError::IndexCorrupted(
328                E::PATH.to_string(),
329                index.fields.join(", "),
330                1,
331            )
332            .into());
333        };
334
335        let entity = deserialize::<E>(&bytes)?;
336        Ok((data_key, entity))
337    }
338}