icydb_core/db/executor/
delete.rs

1use crate::{
2    Error, IndexSpec, Key,
3    db::{
4        Db,
5        executor::{
6            ExecutorError, FilterEvaluator, UniqueIndexHandle,
7            plan::{plan_for, scan_plan, set_rows_from_len},
8            resolve_unique_pk,
9        },
10        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
11        query::{DeleteQuery, QueryPlan, QueryValidate},
12        response::Response,
13        store::DataKey,
14    },
15    deserialize,
16    obs::metrics,
17    sanitize,
18    traits::{EntityKind, FieldValue, FromKey},
19};
20use std::{marker::PhantomData, ops::ControlFlow};
21
22///
23/// DeleteAccumulator
24///
25
26struct DeleteAccumulator<'f, E> {
27    filter: Option<&'f FilterExpr>,
28    offset: usize,
29    skipped: usize,
30    limit: Option<usize>,
31    matches: Vec<(DataKey, E)>,
32}
33
34impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
35    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
36        Self {
37            filter,
38            offset,
39            skipped: 0,
40            limit,
41            matches: Vec::with_capacity(limit.unwrap_or(0)),
42        }
43    }
44
45    fn limit_reached(&self) -> bool {
46        self.limit.is_some_and(|lim| self.matches.len() >= lim)
47    }
48
49    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
50        if let Some(f) = self.filter
51            && !FilterEvaluator::new(&entity).eval(f)
52        {
53            return false;
54        }
55
56        if self.skipped < self.offset {
57            self.skipped += 1;
58            return false;
59        }
60
61        if self.limit_reached() {
62            return true;
63        }
64
65        self.matches.push((dk, entity));
66        false
67    }
68}
69
70///
71/// DeleteExecutor
72///
73
74#[derive(Clone, Copy)]
75pub struct DeleteExecutor<E: EntityKind> {
76    db: Db<E::Canister>,
77    debug: bool,
78    _marker: PhantomData<E>,
79}
80
81impl<E: EntityKind> DeleteExecutor<E> {
82    #[must_use]
83    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
84        Self {
85            db,
86            debug,
87            _marker: PhantomData,
88        }
89    }
90
91    #[must_use]
92    pub const fn debug(mut self) -> Self {
93        self.debug = true;
94        self
95    }
96
97    // ─────────────────────────────────────────────
98    // PK-BASED HELPERS
99    // ─────────────────────────────────────────────
100
101    /// Delete a single row by primary key.
102    pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, Error> {
103        let query = DeleteQuery::new().one::<E>(pk);
104        self.execute(query)
105    }
106
107    /// Delete the unit-key row.
108    pub fn only(self) -> Result<Response<E>, Error> {
109        let query = DeleteQuery::new().one::<E>(());
110        self.execute(query)
111    }
112
113    /// Delete multiple rows by primary keys.
114    pub fn many<I, V>(self, values: I) -> Result<Response<E>, Error>
115    where
116        I: IntoIterator<Item = V>,
117        V: FieldValue,
118    {
119        let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
120
121        self.execute(query)
122    }
123
124    // ─────────────────────────────────────────────
125    // UNIQUE INDEX DELETE
126    // ─────────────────────────────────────────────
127
128    /// Delete a single row using a unique index handle.
129    pub fn by_unique_index(self, index: UniqueIndexHandle, entity: E) -> Result<Response<E>, Error>
130    where
131        E::PrimaryKey: FromKey,
132    {
133        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
134        let index = index.index();
135        let mut lookup = entity;
136        sanitize(&mut lookup)?;
137
138        let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
139            set_rows_from_len(&mut span, 0);
140            return Ok(Response(Vec::new()));
141        };
142
143        let (dk, stored) = self.load_existing(index, pk)?;
144
145        self.db.context::<E>().with_store_mut(|s| {
146            s.remove(&dk);
147            if !E::INDEXES.is_empty() {
148                self.remove_indexes(&stored)?;
149            }
150            Ok::<_, Error>(())
151        })??;
152
153        set_rows_from_len(&mut span, 1);
154        Ok(Response(vec![(dk.key(), stored)]))
155    }
156
157    // ─────────────────────────────────────────────
158    // GENERIC FIELD-BASED DELETE
159    // ─────────────────────────────────────────────
160
161    /// Delete a single row by an arbitrary field value.
162    pub fn one_by_field(
163        self,
164        field: impl AsRef<str>,
165        value: impl FieldValue,
166    ) -> Result<Response<E>, Error> {
167        let query = DeleteQuery::new().one_by_field(field, value);
168        self.execute(query)
169    }
170
171    /// Delete multiple rows by an arbitrary field.
172    pub fn many_by_field<I, V>(
173        self,
174        field: impl AsRef<str>,
175        values: I,
176    ) -> Result<Response<E>, Error>
177    where
178        I: IntoIterator<Item = V>,
179        V: FieldValue,
180    {
181        let query = DeleteQuery::new().many_by_field(field, values);
182        self.execute(query)
183    }
184
185    /// Delete all rows.
186    pub fn all(self) -> Result<Response<E>, Error> {
187        self.execute(DeleteQuery::new())
188    }
189
190    /// Apply a filter builder and delete matches.
191    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
192    where
193        F: FnOnce(FilterDsl) -> I,
194        I: IntoFilterExpr,
195    {
196        let query = DeleteQuery::new().filter(f);
197        self.execute(query)
198    }
199
200    // ─────────────────────────────────────────────
201    // ENSURE HELPERS
202    // ─────────────────────────────────────────────
203
204    pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), Error> {
205        self.one(pk)?.require_one()?;
206        Ok(())
207    }
208
209    pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), Error>
210    where
211        I: IntoIterator<Item = V>,
212        V: FieldValue,
213    {
214        self.many(pks)?.require_some()?;
215
216        Ok(())
217    }
218
219    pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), Error>
220    where
221        I: IntoIterator<Item = V>,
222        V: FieldValue,
223    {
224        self.ensure_delete_any_by_pk(values)
225    }
226
227    // ─────────────────────────────────────────────
228    // EXECUTION
229    // ─────────────────────────────────────────────
230
231    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, Error> {
232        QueryValidate::<E>::validate(&query)?;
233
234        Ok(plan_for::<E>(query.filter.as_ref()))
235    }
236
237    /// Execute a planner-based delete query.
238    ///
239    /// Note: index-planned deletes are best-effort and do not validate unique-index
240    /// invariants. Corrupt index entries may be skipped. Use `by_unique_index` for
241    /// strict unique-index semantics.
242    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, Error> {
243        QueryValidate::<E>::validate(&query)?;
244        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
245
246        let plan = plan_for::<E>(query.filter.as_ref());
247
248        let limit = query
249            .limit
250            .as_ref()
251            .and_then(|l| l.limit)
252            .map(|l| l as usize);
253
254        let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
255        let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
256
257        let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
258
259        let mut scanned = 0u64;
260        scan_plan::<E, _>(&self.db, plan, |dk, entity| {
261            scanned = scanned.saturating_add(1);
262            if acc.should_stop(dk, entity) {
263                ControlFlow::Break(())
264            } else {
265                ControlFlow::Continue(())
266            }
267        })?;
268
269        metrics::record_rows_scanned_for::<E>(scanned);
270
271        let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
272        self.db.context::<E>().with_store_mut(|s| {
273            for (dk, entity) in acc.matches {
274                s.remove(&dk);
275                if !E::INDEXES.is_empty() {
276                    self.remove_indexes(&entity)?;
277                }
278                res.push((dk.key(), entity));
279            }
280            Ok::<_, Error>(())
281        })??;
282
283        set_rows_from_len(&mut span, res.len());
284
285        Ok(Response(res))
286    }
287
288    fn remove_indexes(&self, entity: &E) -> Result<(), Error> {
289        for index in E::INDEXES {
290            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
291            store.with_borrow_mut(|this| {
292                this.remove_index_entry(entity, index);
293            });
294        }
295        Ok(())
296    }
297
298    fn load_existing(
299        &self,
300        index: &'static IndexSpec,
301        pk: E::PrimaryKey,
302    ) -> Result<(DataKey, E), Error> {
303        let data_key = DataKey::new::<E>(pk.into());
304        let bytes = self
305            .db
306            .context::<E>()
307            .with_store(|store| store.get(&data_key))?;
308
309        let Some(bytes) = bytes else {
310            return Err(ExecutorError::IndexCorrupted(
311                E::PATH.to_string(),
312                index.fields.join(", "),
313                1,
314            )
315            .into());
316        };
317
318        let entity = deserialize::<E>(&bytes)?;
319        Ok((data_key, entity))
320    }
321}