1use crate::{
2 Error, Key,
3 db::{
4 Db,
5 executor::{
6 FilterEvaluator,
7 plan::{plan_for, scan_plan, set_rows_from_len},
8 },
9 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10 query::{DeleteQuery, QueryPlan, QueryValidate},
11 response::Response,
12 store::DataKey,
13 },
14 obs::metrics,
15 traits::{EntityKind, FieldValue},
16 view::View,
17};
18use std::{marker::PhantomData, ops::ControlFlow};
19
20struct DeleteAccumulator<'f, E> {
28 filter: Option<&'f FilterExpr>,
29 offset: usize,
30 skipped: usize,
31 limit: Option<usize>,
32 matches: Vec<(DataKey, E)>,
33}
34
35impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
36 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
37 Self {
38 filter,
39 offset,
40 skipped: 0,
41 limit,
42 matches: Vec::with_capacity(limit.unwrap_or(0)),
43 }
44 }
45
46 fn limit_reached(&self) -> bool {
47 self.limit.is_some_and(|lim| self.matches.len() >= lim)
48 }
49
50 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
52 if let Some(f) = self.filter
53 && !FilterEvaluator::new(&entity).eval(f)
54 {
55 return false;
56 }
57
58 if self.skipped < self.offset {
59 self.skipped += 1;
60 return false;
61 }
62
63 if self.limit_reached() {
64 return true;
65 }
66
67 self.matches.push((dk, entity));
68 false
69 }
70}
71
72#[derive(Clone, Copy)]
77pub struct DeleteExecutor<E: EntityKind> {
78 db: Db<E::Canister>,
79 debug: bool,
80 _marker: PhantomData<E>,
81}
82
83impl<E: EntityKind> DeleteExecutor<E> {
84 #[must_use]
85 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
86 Self {
87 db,
88 debug,
89 _marker: PhantomData,
90 }
91 }
92
93 #[must_use]
95 pub const fn debug(mut self) -> Self {
96 self.debug = true;
97 self
98 }
99
100 pub fn one_key(self, value: impl FieldValue) -> Result<Key, Error> {
106 self.one(value)?.try_key()
107 }
108
109 pub fn one_entity(self, value: impl FieldValue) -> Result<E, Error> {
111 self.one(value)?.try_entity()
112 }
113
114 pub fn one_view(self, value: impl FieldValue) -> Result<View<E>, Error> {
116 self.one(value)?.try_view()
117 }
118
119 pub fn one(self, value: impl FieldValue) -> Result<Response<E>, Error> {
125 let query = DeleteQuery::new().one::<E>(value);
126 self.execute(query)
127 }
128
129 pub fn only(self) -> Result<Response<E>, Error> {
131 let query = DeleteQuery::new().one::<E>(());
132 self.execute(query)
133 }
134
135 pub fn many(
137 self,
138 values: impl IntoIterator<Item = impl FieldValue>,
139 ) -> Result<Response<E>, Error> {
140 let query = DeleteQuery::new().many::<E>(values);
141 self.execute(query)
142 }
143
144 pub fn all(self) -> Result<Response<E>, Error> {
146 let query = DeleteQuery::new();
147 self.execute(query)
148 }
149
150 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
152 where
153 F: FnOnce(FilterDsl) -> I,
154 I: IntoFilterExpr,
155 {
156 let query = DeleteQuery::new().filter(f);
157 self.execute(query)
158 }
159
160 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, Error> {
167 QueryValidate::<E>::validate(&query)?;
168
169 Ok(plan_for::<E>(query.filter.as_ref()))
170 }
171
172 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, Error> {
175 QueryValidate::<E>::validate(&query)?;
176 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
177
178 let plan = plan_for::<E>(query.filter.as_ref());
179
180 let limit = query
182 .limit
183 .as_ref()
184 .and_then(|l| l.limit)
185 .map(|l| l as usize);
186 let offset = query.limit.as_ref().map_or(0_usize, |l| l.offset as usize);
187 let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
188
189 let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
190
191 scan_plan::<E, _>(&self.db, plan, |dk, entity| {
192 if acc.should_stop(dk, entity) {
193 ControlFlow::Break(())
194 } else {
195 ControlFlow::Continue(())
196 }
197 })?;
198
199 let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
201 self.db.context::<E>().with_store_mut(|s| {
202 for (dk, entity) in acc.matches {
203 s.remove(&dk);
204 if !E::INDEXES.is_empty() {
205 self.remove_indexes(&entity)?;
206 }
207 res.push((dk.key(), entity));
208 }
209
210 Ok::<_, Error>(())
211 })??;
212
213 set_rows_from_len(&mut span, res.len());
214
215 Ok(Response(res))
216 }
217
218 fn remove_indexes(&self, entity: &E) -> Result<(), Error> {
220 for index in E::INDEXES {
221 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
222
223 store.with_borrow_mut(|this| {
224 this.remove_index_entry(entity, index);
225 });
226 }
227
228 Ok(())
229 }
230}