icydb_core/db/executor/
delete.rs

1use crate::{
2    Error, Key,
3    db::{
4        Db,
5        executor::{
6            FilterEvaluator,
7            plan::{plan_for, scan_plan, set_rows_from_len},
8        },
9        primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10        query::{DeleteQuery, QueryPlan, QueryValidate},
11        response::Response,
12        store::DataKey,
13    },
14    obs::metrics,
15    traits::{EntityKind, FieldValue},
16};
17use std::{marker::PhantomData, ops::ControlFlow};
18
19///
20/// DeleteAccumulator
21///
22/// collects matched rows for deletion while applying filter + offset/limit during iteration
23/// stops scanning once the window is satisfied.
24///
25
26struct DeleteAccumulator<'f, E> {
27    filter: Option<&'f FilterExpr>,
28    offset: usize,
29    skipped: usize,
30    limit: Option<usize>,
31    matches: Vec<(DataKey, E)>,
32}
33
34impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
35    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
36        Self {
37            filter,
38            offset,
39            skipped: 0,
40            limit,
41            matches: Vec::with_capacity(limit.unwrap_or(0)),
42        }
43    }
44
45    fn limit_reached(&self) -> bool {
46        self.limit.is_some_and(|lim| self.matches.len() >= lim)
47    }
48
49    /// Returns true when the limit has been reached and iteration should stop.
50    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
51        if let Some(f) = self.filter
52            && !FilterEvaluator::new(&entity).eval(f)
53        {
54            return false;
55        }
56
57        if self.skipped < self.offset {
58            self.skipped += 1;
59            return false;
60        }
61
62        if self.limit_reached() {
63            return true;
64        }
65
66        self.matches.push((dk, entity));
67        false
68    }
69}
70
71///
72/// DeleteExecutor
73///
74
75#[derive(Clone, Copy)]
76pub struct DeleteExecutor<E: EntityKind> {
77    db: Db<E::Canister>,
78    debug: bool,
79    _marker: PhantomData<E>,
80}
81
82impl<E: EntityKind> DeleteExecutor<E> {
83    #[must_use]
84    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
85        Self {
86            db,
87            debug,
88            _marker: PhantomData,
89        }
90    }
91
92    // debug
93    #[must_use]
94    pub const fn debug(mut self) -> Self {
95        self.debug = true;
96        self
97    }
98
99    ///
100    /// HELPER METHODS
101    ///
102
103    /// Delete a single matching row.
104    pub fn one(self, value: impl FieldValue) -> Result<Response<E>, Error> {
105        let query = DeleteQuery::new().one::<E>(value);
106        self.execute(query)
107    }
108
109    /// Delete the unit-key row.
110    pub fn only(self) -> Result<Response<E>, Error> {
111        let query = DeleteQuery::new().one::<E>(());
112        self.execute(query)
113    }
114
115    /// Delete multiple rows by primary keys.
116    pub fn many(
117        self,
118        values: impl IntoIterator<Item = impl FieldValue>,
119    ) -> Result<Response<E>, Error> {
120        let query = DeleteQuery::new().many::<E>(values);
121        self.execute(query)
122    }
123
124    /// Delete all rows.
125    pub fn all(self) -> Result<Response<E>, Error> {
126        let query = DeleteQuery::new();
127        self.execute(query)
128    }
129
130    /// Apply a filter builder and delete matches.
131    pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
132    where
133        F: FnOnce(FilterDsl) -> I,
134        I: IntoFilterExpr,
135    {
136        let query = DeleteQuery::new().filter(f);
137        self.execute(query)
138    }
139
140    ///
141    /// EXECUTION METHODS
142    ///
143
144    // explain
145    /// Validate and return the query plan without executing.
146    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, Error> {
147        QueryValidate::<E>::validate(&query)?;
148
149        Ok(plan_for::<E>(query.filter.as_ref()))
150    }
151
152    // execute
153    /// Execute a delete query and return the removed rows.
154    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, Error> {
155        QueryValidate::<E>::validate(&query)?;
156        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
157
158        let plan = plan_for::<E>(query.filter.as_ref());
159
160        // query prep
161        let limit = query
162            .limit
163            .as_ref()
164            .and_then(|l| l.limit)
165            .map(|l| l as usize);
166        let offset = query.limit.as_ref().map_or(0_usize, |l| l.offset as usize);
167        let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
168
169        let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
170
171        scan_plan::<E, _>(&self.db, plan, |dk, entity| {
172            if acc.should_stop(dk, entity) {
173                ControlFlow::Break(())
174            } else {
175                ControlFlow::Continue(())
176            }
177        })?;
178
179        // Apply deletions + index teardown
180        let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
181        self.db.context::<E>().with_store_mut(|s| {
182            for (dk, entity) in acc.matches {
183                s.remove(&dk);
184                if !E::INDEXES.is_empty() {
185                    self.remove_indexes(&entity)?;
186                }
187                res.push((dk.key(), entity));
188            }
189
190            Ok::<_, Error>(())
191        })??;
192
193        set_rows_from_len(&mut span, res.len());
194
195        Ok(Response(res))
196    }
197
198    // remove_indexes
199    fn remove_indexes(&self, entity: &E) -> Result<(), Error> {
200        for index in E::INDEXES {
201            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
202
203            store.with_borrow_mut(|this| {
204                this.remove_index_entry(entity, index);
205            });
206        }
207
208        Ok(())
209    }
210}