icydb_core/db/executor/
delete.rs

1use crate::{
2    Error, Key,
3    db::{
4        Db,
5        executor::{
6            FilterEvaluator,
7            plan::{plan_for, scan_plan, set_rows_from_len},
8        },
9        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10        query::{DeleteQuery, QueryPlan, QueryValidate},
11        response::Response,
12        store::DataKey,
13    },
14    obs::metrics,
15    traits::{EntityKind, FieldValue},
16    view::View,
17};
18use std::{marker::PhantomData, ops::ControlFlow};
19
20///
21/// DeleteAccumulator
22///
23/// collects matched rows for deletion while applying filter + offset/limit during iteration
24/// stops scanning once the window is satisfied.
25///
26
27struct DeleteAccumulator<'f, E> {
28    filter: Option<&'f FilterExpr>,
29    offset: usize,
30    skipped: usize,
31    limit: Option<usize>,
32    matches: Vec<(DataKey, E)>,
33}
34
35impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
36    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
37        Self {
38            filter,
39            offset,
40            skipped: 0,
41            limit,
42            matches: Vec::with_capacity(limit.unwrap_or(0)),
43        }
44    }
45
46    fn limit_reached(&self) -> bool {
47        self.limit.is_some_and(|lim| self.matches.len() >= lim)
48    }
49
50    /// Returns true when the limit has been reached and iteration should stop.
51    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
52        if let Some(f) = self.filter
53            && !FilterEvaluator::new(&entity).eval(f)
54        {
55            return false;
56        }
57
58        if self.skipped < self.offset {
59            self.skipped += 1;
60            return false;
61        }
62
63        if self.limit_reached() {
64            return true;
65        }
66
67        self.matches.push((dk, entity));
68        false
69    }
70}
71
72///
73/// DeleteExecutor
74///
75
76#[derive(Clone, Copy)]
77pub struct DeleteExecutor<E: EntityKind> {
78    db: Db<E::Canister>,
79    debug: bool,
80    _marker: PhantomData<E>,
81}
82
83impl<E: EntityKind> DeleteExecutor<E> {
84    #[must_use]
85    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
86        Self {
87            db,
88            debug,
89            _marker: PhantomData,
90        }
91    }
92
93    // debug
94    #[must_use]
95    pub const fn debug(mut self) -> Self {
96        self.debug = true;
97        self
98    }
99
100    ///
101    /// SHORTCUT METHODS
102    ///
103
104    /// Delete a row by primary key and return its key.
105    pub fn one_key(self, value: impl FieldValue) -> Result<Key, Error> {
106        self.one(value)?.try_key()
107    }
108
109    /// Delete a row by primary key and return its entity.
110    pub fn one_entity(self, value: impl FieldValue) -> Result<E, Error> {
111        self.one(value)?.try_entity()
112    }
113
114    /// Delete a row by primary key and return its view.
115    pub fn one_view(self, value: impl FieldValue) -> Result<View<E>, Error> {
116        self.one(value)?.try_view()
117    }
118
119    ///
120    /// HELPER METHODS
121    ///
122
123    /// Delete a single matching row.
124    pub fn one(self, value: impl FieldValue) -> Result<Response<E>, Error> {
125        let query = DeleteQuery::new().one::<E>(value);
126        self.execute(query)
127    }
128
129    /// Delete the unit-key row.
130    pub fn only(self) -> Result<Response<E>, Error> {
131        let query = DeleteQuery::new().one::<E>(());
132        self.execute(query)
133    }
134
135    /// Delete multiple rows by primary keys.
136    pub fn many(
137        self,
138        values: impl IntoIterator<Item = impl FieldValue>,
139    ) -> Result<Response<E>, Error> {
140        let query = DeleteQuery::new().many::<E>(values);
141        self.execute(query)
142    }
143
144    /// Delete all rows.
145    pub fn all(self) -> Result<Response<E>, Error> {
146        let query = DeleteQuery::new();
147        self.execute(query)
148    }
149
150    /// Apply a filter builder and delete matches.
151    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
152    where
153        F: FnOnce(FilterDsl) -> I,
154        I: IntoFilterExpr,
155    {
156        let query = DeleteQuery::new().filter(f);
157        self.execute(query)
158    }
159
160    ///
161    /// EXECUTION METHODS
162    ///
163
164    // explain
165    /// Validate and return the query plan without executing.
166    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, Error> {
167        QueryValidate::<E>::validate(&query)?;
168
169        Ok(plan_for::<E>(query.filter.as_ref()))
170    }
171
172    // execute
173    /// Execute a delete query and return the removed rows.
174    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, Error> {
175        QueryValidate::<E>::validate(&query)?;
176        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
177
178        let plan = plan_for::<E>(query.filter.as_ref());
179
180        // query prep
181        let limit = query
182            .limit
183            .as_ref()
184            .and_then(|l| l.limit)
185            .map(|l| l as usize);
186        let offset = query.limit.as_ref().map_or(0_usize, |l| l.offset as usize);
187        let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
188
189        let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
190
191        scan_plan::<E, _>(&self.db, plan, |dk, entity| {
192            if acc.should_stop(dk, entity) {
193                ControlFlow::Break(())
194            } else {
195                ControlFlow::Continue(())
196            }
197        })?;
198
199        // Apply deletions + index teardown
200        let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
201        self.db.context::<E>().with_store_mut(|s| {
202            for (dk, entity) in acc.matches {
203                s.remove(&dk);
204                if !E::INDEXES.is_empty() {
205                    self.remove_indexes(&entity)?;
206                }
207                res.push((dk.key(), entity));
208            }
209
210            Ok::<_, Error>(())
211        })??;
212
213        set_rows_from_len(&mut span, res.len());
214
215        Ok(Response(res))
216    }
217
218    // remove_indexes
219    fn remove_indexes(&self, entity: &E) -> Result<(), Error> {
220        for index in E::INDEXES {
221            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
222
223            store.with_borrow_mut(|this| {
224                this.remove_index_entry(entity, index);
225            });
226        }
227
228        Ok(())
229    }
230}