icydb_core/db/executor/
delete.rs

1use crate::{
2    Error, Key,
3    db::{
4        Db,
5        executor::{
6            FilterEvaluator,
7            plan::{plan_for, scan_plan, set_rows_from_len},
8        },
9        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10        query::{DeleteQuery, QueryPlan, QueryValidate},
11        response::Response,
12        store::DataKey,
13    },
14    obs::metrics,
15    traits::{EntityKind, FieldValue},
16};
17use std::{marker::PhantomData, ops::ControlFlow};
18
19///
20/// DeleteAccumulator
21///
22
23struct DeleteAccumulator<'f, E> {
24    filter: Option<&'f FilterExpr>,
25    offset: usize,
26    skipped: usize,
27    limit: Option<usize>,
28    matches: Vec<(DataKey, E)>,
29}
30
31impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
32    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
33        Self {
34            filter,
35            offset,
36            skipped: 0,
37            limit,
38            matches: Vec::with_capacity(limit.unwrap_or(0)),
39        }
40    }
41
42    fn limit_reached(&self) -> bool {
43        self.limit.is_some_and(|lim| self.matches.len() >= lim)
44    }
45
46    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
47        if let Some(f) = self.filter
48            && !FilterEvaluator::new(&entity).eval(f)
49        {
50            return false;
51        }
52
53        if self.skipped < self.offset {
54            self.skipped += 1;
55            return false;
56        }
57
58        if self.limit_reached() {
59            return true;
60        }
61
62        self.matches.push((dk, entity));
63        false
64    }
65}
66
67///
68/// DeleteExecutor
69///
70
71#[derive(Clone, Copy)]
72pub struct DeleteExecutor<E: EntityKind> {
73    db: Db<E::Canister>,
74    debug: bool,
75    _marker: PhantomData<E>,
76}
77
78impl<E: EntityKind> DeleteExecutor<E> {
79    #[must_use]
80    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
81        Self {
82            db,
83            debug,
84            _marker: PhantomData,
85        }
86    }
87
88    #[must_use]
89    pub const fn debug(mut self) -> Self {
90        self.debug = true;
91        self
92    }
93
94    // ─────────────────────────────────────────────
95    // PK-BASED HELPERS
96    // ─────────────────────────────────────────────
97
98    /// Delete a single row by primary key.
99    pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, Error> {
100        let query = DeleteQuery::new().one::<E>(pk);
101        self.execute(query)
102    }
103
104    /// Delete the unit-key row.
105    pub fn only(self) -> Result<Response<E>, Error> {
106        let query = DeleteQuery::new().one::<E>(());
107        self.execute(query)
108    }
109
110    /// Delete multiple rows by primary keys.
111    pub fn many<I, V>(self, values: I) -> Result<Response<E>, Error>
112    where
113        I: IntoIterator<Item = V>,
114        V: FieldValue,
115    {
116        let query = DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values);
117
118        self.execute(query)
119    }
120
121    // ─────────────────────────────────────────────
122    // GENERIC FIELD-BASED DELETE
123    // ─────────────────────────────────────────────
124
125    /// Delete a single row by an arbitrary field value.
126    pub fn one_by_field(
127        self,
128        field: impl AsRef<str>,
129        value: impl FieldValue,
130    ) -> Result<Response<E>, Error> {
131        let query = DeleteQuery::new().one_by_field(field, value);
132        self.execute(query)
133    }
134
135    /// Delete multiple rows by an arbitrary field.
136    pub fn many_by_field<I, V>(
137        self,
138        field: impl AsRef<str>,
139        values: I,
140    ) -> Result<Response<E>, Error>
141    where
142        I: IntoIterator<Item = V>,
143        V: FieldValue,
144    {
145        let query = DeleteQuery::new().many_by_field(field, values);
146        self.execute(query)
147    }
148
149    /// Delete all rows.
150    pub fn all(self) -> Result<Response<E>, Error> {
151        self.execute(DeleteQuery::new())
152    }
153
154    /// Apply a filter builder and delete matches.
155    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
156    where
157        F: FnOnce(FilterDsl) -> I,
158        I: IntoFilterExpr,
159    {
160        let query = DeleteQuery::new().filter(f);
161        self.execute(query)
162    }
163
164    // ─────────────────────────────────────────────
165    // ENSURE HELPERS
166    // ─────────────────────────────────────────────
167
168    pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), Error> {
169        self.one(pk)?.require_one()?;
170        Ok(())
171    }
172
173    pub fn ensure_delete_any_by_pk<I, V>(self, pks: I) -> Result<(), Error>
174    where
175        I: IntoIterator<Item = V>,
176        V: FieldValue,
177    {
178        self.many(pks)?.require_some()?;
179
180        Ok(())
181    }
182
183    pub fn ensure_delete_any<I, V>(self, values: I) -> Result<(), Error>
184    where
185        I: IntoIterator<Item = V>,
186        V: FieldValue,
187    {
188        self.ensure_delete_any_by_pk(values)
189    }
190
191    // ─────────────────────────────────────────────
192    // EXECUTION
193    // ─────────────────────────────────────────────
194
195    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, Error> {
196        QueryValidate::<E>::validate(&query)?;
197        Ok(plan_for::<E>(query.filter.as_ref()))
198    }
199
200    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, Error> {
201        QueryValidate::<E>::validate(&query)?;
202        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
203
204        let plan = plan_for::<E>(query.filter.as_ref());
205
206        let limit = query
207            .limit
208            .as_ref()
209            .and_then(|l| l.limit)
210            .map(|l| l as usize);
211
212        let offset = query.limit.as_ref().map_or(0, |l| l.offset as usize);
213        let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
214
215        let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
216
217        scan_plan::<E, _>(&self.db, plan, |dk, entity| {
218            if acc.should_stop(dk, entity) {
219                ControlFlow::Break(())
220            } else {
221                ControlFlow::Continue(())
222            }
223        })?;
224
225        let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
226        self.db.context::<E>().with_store_mut(|s| {
227            for (dk, entity) in acc.matches {
228                s.remove(&dk);
229                if !E::INDEXES.is_empty() {
230                    self.remove_indexes(&entity)?;
231                }
232                res.push((dk.key(), entity));
233            }
234            Ok::<_, Error>(())
235        })??;
236
237        set_rows_from_len(&mut span, res.len());
238        Ok(Response(res))
239    }
240
241    fn remove_indexes(&self, entity: &E) -> Result<(), Error> {
242        for index in E::INDEXES {
243            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
244            store.with_borrow_mut(|this| {
245                this.remove_index_entry(entity, index);
246            });
247        }
248        Ok(())
249    }
250}