icydb_core/db/executor/
delete.rs

1use crate::{
2    Error, Key,
3    db::{
4        Db,
5        executor::{
6            FilterEvaluator,
7            plan::{plan_for, scan_plan, set_rows_from_len},
8        },
9        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10        query::{DeleteQuery, QueryPlan, QueryValidate},
11        response::Response,
12        store::DataKey,
13    },
14    obs::metrics,
15    traits::{EntityKind, FieldValue},
16};
17use std::{marker::PhantomData, ops::ControlFlow};
18
19///
20/// DeleteAccumulator
21///
22/// collects matched rows for deletion while applying filter + offset/limit during iteration
23/// stops scanning once the window is satisfied.
24///
25
26struct DeleteAccumulator<'f, E> {
27    filter: Option<&'f FilterExpr>,
28    offset: usize,
29    skipped: usize,
30    limit: Option<usize>,
31    matches: Vec<(DataKey, E)>,
32}
33
34impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
35    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
36        Self {
37            filter,
38            offset,
39            skipped: 0,
40            limit,
41            matches: Vec::with_capacity(limit.unwrap_or(0)),
42        }
43    }
44
45    fn limit_reached(&self) -> bool {
46        self.limit.is_some_and(|lim| self.matches.len() >= lim)
47    }
48
49    /// Returns true when the limit has been reached and iteration should stop.
50    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
51        if let Some(f) = self.filter
52            && !FilterEvaluator::new(&entity).eval(f)
53        {
54            return false;
55        }
56
57        if self.skipped < self.offset {
58            self.skipped += 1;
59            return false;
60        }
61
62        if self.limit_reached() {
63            return true;
64        }
65
66        self.matches.push((dk, entity));
67        false
68    }
69}
70
71///
72/// DeleteExecutor
73///
74
75#[derive(Clone, Copy)]
76pub struct DeleteExecutor<E: EntityKind> {
77    db: Db<E::Canister>,
78    debug: bool,
79    _marker: PhantomData<E>,
80}
81
82impl<E: EntityKind> DeleteExecutor<E> {
83    #[must_use]
84    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
85        Self {
86            db,
87            debug,
88            _marker: PhantomData,
89        }
90    }
91
92    // debug
93    #[must_use]
94    pub const fn debug(mut self) -> Self {
95        self.debug = true;
96        self
97    }
98
99    ///
100    /// HELPER METHODS
101    ///
102
103    /// Delete a single matching row.
104    pub fn one(self, value: impl FieldValue) -> Result<Response<E>, Error> {
105        let query = DeleteQuery::new().one::<E>(value);
106        self.execute(query)
107    }
108
109    /// Delete the unit-key row.
110    pub fn only(self) -> Result<Response<E>, Error> {
111        let query = DeleteQuery::new().one::<E>(());
112        self.execute(query)
113    }
114
115    /// Delete multiple rows by primary keys.
116    pub fn many(
117        self,
118        values: impl IntoIterator<Item = impl FieldValue>,
119    ) -> Result<Response<E>, Error> {
120        let query = DeleteQuery::new().many::<E>(values);
121        self.execute(query)
122    }
123
124    /// Delete all rows.
125    pub fn all(self) -> Result<Response<E>, Error> {
126        let query = DeleteQuery::new();
127        self.execute(query)
128    }
129
130    /// Apply a filter builder and delete matches.
131    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
132    where
133        F: FnOnce(FilterDsl) -> I,
134        I: IntoFilterExpr,
135    {
136        let query = DeleteQuery::new().filter(f);
137        self.execute(query)
138    }
139
140    ///
141    /// EXECUTION METHODS
142    ///
143
144    pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), Error> {
145        self.one(pk)?.require_one()?;
146
147        Ok(())
148    }
149
150    pub fn ensure_delete_any(
151        self,
152        pks: impl IntoIterator<Item = impl FieldValue>,
153    ) -> Result<(), Error> {
154        self.many(pks)?.require_some()?;
155
156        Ok(())
157    }
158
159    /// Validate and return the query plan without executing.
160    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, Error> {
161        QueryValidate::<E>::validate(&query)?;
162
163        Ok(plan_for::<E>(query.filter.as_ref()))
164    }
165
166    /// Execute a delete query and return the removed rows.
167    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, Error> {
168        QueryValidate::<E>::validate(&query)?;
169        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
170
171        let plan = plan_for::<E>(query.filter.as_ref());
172
173        // query prep
174        let limit = query
175            .limit
176            .as_ref()
177            .and_then(|l| l.limit)
178            .map(|l| l as usize);
179        let offset = query.limit.as_ref().map_or(0_usize, |l| l.offset as usize);
180        let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
181
182        let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
183
184        scan_plan::<E, _>(&self.db, plan, |dk, entity| {
185            if acc.should_stop(dk, entity) {
186                ControlFlow::Break(())
187            } else {
188                ControlFlow::Continue(())
189            }
190        })?;
191
192        // Apply deletions + index teardown
193        let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
194        self.db.context::<E>().with_store_mut(|s| {
195            for (dk, entity) in acc.matches {
196                s.remove(&dk);
197                if !E::INDEXES.is_empty() {
198                    self.remove_indexes(&entity)?;
199                }
200                res.push((dk.key(), entity));
201            }
202
203            Ok::<_, Error>(())
204        })??;
205
206        set_rows_from_len(&mut span, res.len());
207
208        Ok(Response(res))
209    }
210
211    // remove_indexes
212    fn remove_indexes(&self, entity: &E) -> Result<(), Error> {
213        for index in E::INDEXES {
214            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
215
216            store.with_borrow_mut(|this| {
217                this.remove_index_entry(entity, index);
218            });
219        }
220
221        Ok(())
222    }
223}