icydb_core/db/executor/
delete.rs1use crate::{
2 Error, Key,
3 db::{
4 Db,
5 executor::{
6 FilterEvaluator,
7 plan::{plan_for, scan_plan, set_rows_from_len},
8 },
9 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10 query::{DeleteQuery, QueryPlan, QueryValidate},
11 response::Response,
12 store::DataKey,
13 },
14 obs::metrics,
15 traits::{EntityKind, FieldValue},
16};
17use std::{marker::PhantomData, ops::ControlFlow};
18
19struct DeleteAccumulator<'f, E> {
27 filter: Option<&'f FilterExpr>,
28 offset: usize,
29 skipped: usize,
30 limit: Option<usize>,
31 matches: Vec<(DataKey, E)>,
32}
33
34impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
35 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
36 Self {
37 filter,
38 offset,
39 skipped: 0,
40 limit,
41 matches: Vec::with_capacity(limit.unwrap_or(0)),
42 }
43 }
44
45 fn limit_reached(&self) -> bool {
46 self.limit.is_some_and(|lim| self.matches.len() >= lim)
47 }
48
49 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
51 if let Some(f) = self.filter
52 && !FilterEvaluator::new(&entity).eval(f)
53 {
54 return false;
55 }
56
57 if self.skipped < self.offset {
58 self.skipped += 1;
59 return false;
60 }
61
62 if self.limit_reached() {
63 return true;
64 }
65
66 self.matches.push((dk, entity));
67 false
68 }
69}
70
71#[derive(Clone, Copy)]
76pub struct DeleteExecutor<E: EntityKind> {
77 db: Db<E::Canister>,
78 debug: bool,
79 _marker: PhantomData<E>,
80}
81
82impl<E: EntityKind> DeleteExecutor<E> {
83 #[must_use]
84 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
85 Self {
86 db,
87 debug,
88 _marker: PhantomData,
89 }
90 }
91
92 #[must_use]
94 pub const fn debug(mut self) -> Self {
95 self.debug = true;
96 self
97 }
98
99 pub fn one(self, value: impl FieldValue) -> Result<Response<E>, Error> {
105 let query = DeleteQuery::new().one::<E>(value);
106 self.execute(query)
107 }
108
109 pub fn only(self) -> Result<Response<E>, Error> {
111 let query = DeleteQuery::new().one::<E>(());
112 self.execute(query)
113 }
114
115 pub fn many(
117 self,
118 values: impl IntoIterator<Item = impl FieldValue>,
119 ) -> Result<Response<E>, Error> {
120 let query = DeleteQuery::new().many::<E>(values);
121 self.execute(query)
122 }
123
124 pub fn all(self) -> Result<Response<E>, Error> {
126 let query = DeleteQuery::new();
127 self.execute(query)
128 }
129
130 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
132 where
133 F: FnOnce(FilterDsl) -> I,
134 I: IntoFilterExpr,
135 {
136 let query = DeleteQuery::new().filter(f);
137 self.execute(query)
138 }
139
140 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, Error> {
147 QueryValidate::<E>::validate(&query)?;
148
149 Ok(plan_for::<E>(query.filter.as_ref()))
150 }
151
152 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, Error> {
155 QueryValidate::<E>::validate(&query)?;
156 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
157
158 let plan = plan_for::<E>(query.filter.as_ref());
159
160 let limit = query
162 .limit
163 .as_ref()
164 .and_then(|l| l.limit)
165 .map(|l| l as usize);
166 let offset = query.limit.as_ref().map_or(0_usize, |l| l.offset as usize);
167 let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
168
169 let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
170
171 scan_plan::<E, _>(&self.db, plan, |dk, entity| {
172 if acc.should_stop(dk, entity) {
173 ControlFlow::Break(())
174 } else {
175 ControlFlow::Continue(())
176 }
177 })?;
178
179 let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
181 self.db.context::<E>().with_store_mut(|s| {
182 for (dk, entity) in acc.matches {
183 s.remove(&dk);
184 if !E::INDEXES.is_empty() {
185 self.remove_indexes(&entity)?;
186 }
187 res.push((dk.key(), entity));
188 }
189
190 Ok::<_, Error>(())
191 })??;
192
193 set_rows_from_len(&mut span, res.len());
194
195 Ok(Response(res))
196 }
197
198 fn remove_indexes(&self, entity: &E) -> Result<(), Error> {
200 for index in E::INDEXES {
201 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
202
203 store.with_borrow_mut(|this| {
204 this.remove_index_entry(entity, index);
205 });
206 }
207
208 Ok(())
209 }
210}