icydb_core/db/executor/
delete.rs

1use crate::{
2    Error, IndexSpec, Key,
3    db::{
4        Db,
5        executor::{
6            ExecutorError, FilterEvaluator, UniqueIndexHandle,
7            plan::{plan_for, scan_plan, set_rows_from_len},
8            resolve_unique_pk,
9        },
10        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
11        query::{DeleteQuery, QueryPlan, QueryValidate},
12        response::Response,
13        store::DataKey,
14    },
15    deserialize,
16    obs::metrics,
17    traits::{EntityKind, FieldValue, FromKey},
18    visitor::sanitize,
19};
20use std::{marker::PhantomData, ops::ControlFlow};
21
22///
23/// DeleteAccumulator
24///
25
26struct DeleteAccumulator<'f, E> {
27    filter: Option<&'f FilterExpr>,
28    offset: usize,
29    skipped: usize,
30    limit: Option<usize>,
31    matches: Vec<(DataKey, E)>,
32}
33
34impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
35    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
36        Self {
37            filter,
38            offset,
39            skipped: 0,
40            limit,
41            matches: Vec::with_capacity(limit.unwrap_or(0)),
42        }
43    }
44
45    fn limit_reached(&self) -> bool {
46        self.limit.is_some_and(|lim| self.matches.len() >= lim)
47    }
48
49    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
50        if let Some(f) = self.filter
51            && !FilterEvaluator::new(&entity).eval(f)
52        {
53            return false;
54        }
55
56        if self.skipped < self.offset {
57            self.skipped += 1;
58            return false;
59        }
60
61        if self.limit_reached() {
62            return true;
63        }
64
65        self.matches.push((dk, entity));
66        false
67    }
68}
69
70///
71/// DeleteExecutor
72///
73
74#[derive(Clone, Copy)]
75pub struct DeleteExecutor<E: EntityKind> {
76    db: Db<E::Canister>,
77    debug: bool,
78    _marker: PhantomData<E>,
79}
80
81impl<E: EntityKind> DeleteExecutor<E> {
82    #[must_use]
83    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
84        Self {
85            db,
86            debug,
87            _marker: PhantomData,
88        }
89    }
90
91    #[must_use]
92    pub const fn debug(mut self) -> Self {
93        self.debug = true;
94        self
95    }
96
97    // ─────────────────────────────────────────────
98    // PK-BASED HELPERS
99    // ─────────────────────────────────────────────
100
101    /// Delete a single row by primary key.
102    pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, Error> {
103        let query = DeleteQuery::new().one::<E>(pk);
104        self.execute(query)
105    }
106
107    /// Delete the unit-key row.
108    pub fn only(self) -> Result<Response<E>, Error> {
109        let query = DeleteQuery::new().one::<E>(());
110        self.execute(query)
111    }
112
113    /// Delete multiple rows by primary keys.
114    pub fn many<I, V>(self, values: I) -> Result<Response<E>, Error>
115    where
116        I: IntoIterator<Item = V>,
117        V: FieldValue,
118    {
119        let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
120
121        self.execute(query)
122    }
123
124    // ─────────────────────────────────────────────
125    // UNIQUE INDEX DELETE
126    // ─────────────────────────────────────────────
127
128    /// Delete a single row using a unique index handle.
129    pub fn by_unique_index(self, index: UniqueIndexHandle, entity: E) -> Result<Response<E>, Error>
130    where
131        E::PrimaryKey: FromKey,
132    {
133        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
134        let index = index.index();
135        let mut lookup = entity;
136        sanitize(&mut lookup);
137
138        let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
139            set_rows_from_len(&mut span, 0);
140            return Ok(Response(Vec::new()));
141        };
142
143        let (dk, stored) = self.load_existing(index, pk)?;
144
145        self.db.context::<E>().with_store_mut(|s| {
146            s.remove(&dk);
147            if !E::INDEXES.is_empty() {
148                self.remove_indexes(&stored)?;
149            }
150            Ok::<_, Error>(())
151        })??;
152
153        set_rows_from_len(&mut span, 1);
154        Ok(Response(vec![(dk.key(), stored)]))
155    }
156
157    // ─────────────────────────────────────────────
158    // GENERIC FIELD-BASED DELETE
159    // ─────────────────────────────────────────────
160
161    /// Delete a single row by an arbitrary field value.
162    pub fn one_by_field(
163        self,
164        field: impl AsRef<str>,
165        value: impl FieldValue,
166    ) -> Result<Response<E>, Error> {
167        let query = DeleteQuery::new().one_by_field(field, value);
168        self.execute(query)
169    }
170
171    /// Delete multiple rows by an arbitrary field.
172    pub fn many_by_field<I, V>(
173        self,
174        field: impl AsRef<str>,
175        values: I,
176    ) -> Result<Response<E>, Error>
177    where
178        I: IntoIterator<Item = V>,
179        V: FieldValue,
180    {
181        let query = DeleteQuery::new().many_by_field(field, values);
182        self.execute(query)
183    }
184
185    /// Delete all rows.
186    pub fn all(self) -> Result<Response<E>, Error> {
187        self.execute(DeleteQuery::new())
188    }
189
190    /// Apply a filter builder and delete matches.
191    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
192    where
193        F: FnOnce(FilterDsl) -> I,
194        I: IntoFilterExpr,
195    {
196        let query = DeleteQuery::new().filter(f);
197        self.execute(query)
198    }
199
200    // ─────────────────────────────────────────────
201    // ENSURE HELPERS
202    // ─────────────────────────────────────────────
203
204    pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), Error> {
205        self.one(pk)?.require_one()?;
206        Ok(())
207    }
208
209    pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), Error>
210    where
211        I: IntoIterator<Item = V>,
212        V: FieldValue,
213    {
214        self.many(pks)?.require_some()?;
215
216        Ok(())
217    }
218
219    pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), Error>
220    where
221        I: IntoIterator<Item = V>,
222        V: FieldValue,
223    {
224        self.ensure_delete_any_by_pk(values)
225    }
226
227    // ─────────────────────────────────────────────
228    // EXECUTION
229    // ─────────────────────────────────────────────
230
231    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, Error> {
232        QueryValidate::<E>::validate(&query)?;
233        Ok(plan_for::<E>(query.filter.as_ref()))
234    }
235
236    /// Execute a planner-based delete query.
237    ///
238    /// Note: index-planned deletes are best-effort and do not validate unique-index
239    /// invariants. Corrupt index entries may be skipped. Use `by_unique_index` for
240    /// strict unique-index semantics.
241    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, Error> {
242        QueryValidate::<E>::validate(&query)?;
243        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
244
245        let plan = plan_for::<E>(query.filter.as_ref());
246
247        let limit = query
248            .limit
249            .as_ref()
250            .and_then(|l| l.limit)
251            .map(|l| l as usize);
252
253        let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
254        let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
255
256        let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
257
258        let mut scanned = 0u64;
259        scan_plan::<E, _>(&self.db, plan, |dk, entity| {
260            scanned = scanned.saturating_add(1);
261            if acc.should_stop(dk, entity) {
262                ControlFlow::Break(())
263            } else {
264                ControlFlow::Continue(())
265            }
266        })?;
267
268        metrics::record_rows_scanned_for::<E>(scanned);
269
270        let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
271        self.db.context::<E>().with_store_mut(|s| {
272            for (dk, entity) in acc.matches {
273                s.remove(&dk);
274                if !E::INDEXES.is_empty() {
275                    self.remove_indexes(&entity)?;
276                }
277                res.push((dk.key(), entity));
278            }
279            Ok::<_, Error>(())
280        })??;
281
282        set_rows_from_len(&mut span, res.len());
283
284        Ok(Response(res))
285    }
286
287    fn remove_indexes(&self, entity: &E) -> Result<(), Error> {
288        for index in E::INDEXES {
289            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
290            store.with_borrow_mut(|this| {
291                this.remove_index_entry(entity, index);
292            });
293        }
294        Ok(())
295    }
296
297    fn load_existing(
298        &self,
299        index: &'static IndexSpec,
300        pk: E::PrimaryKey,
301    ) -> Result<(DataKey, E), Error> {
302        let data_key = DataKey::new::<E>(pk.into());
303        let bytes = self
304            .db
305            .context::<E>()
306            .with_store(|store| store.get(&data_key))?;
307
308        let Some(bytes) = bytes else {
309            return Err(ExecutorError::IndexCorrupted(
310                E::PATH.to_string(),
311                index.fields.join(", "),
312                1,
313            )
314            .into());
315        };
316
317        let entity = deserialize::<E>(&bytes)?;
318        Ok((data_key, entity))
319    }
320}