1use crate::{
2 Error, Key,
3 db::{
4 Db,
5 executor::{
6 FilterEvaluator,
7 plan::{plan_for, scan_plan, set_rows_from_len},
8 },
9 primitives::{FilterDsl, FilterExpr, FilterExt, IntoFilterExpr},
10 query::{DeleteQuery, QueryPlan, QueryValidate},
11 response::Response,
12 store::DataKey,
13 },
14 obs::metrics,
15 traits::{EntityKind, FieldValue},
16};
17use std::{marker::PhantomData, ops::ControlFlow};
18
19struct DeleteAccumulator<'f, E> {
27 filter: Option<&'f FilterExpr>,
28 offset: usize,
29 skipped: usize,
30 limit: Option<usize>,
31 matches: Vec<(DataKey, E)>,
32}
33
34impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
35 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
36 Self {
37 filter,
38 offset,
39 skipped: 0,
40 limit,
41 matches: Vec::with_capacity(limit.unwrap_or(0)),
42 }
43 }
44
45 fn limit_reached(&self) -> bool {
46 self.limit.is_some_and(|lim| self.matches.len() >= lim)
47 }
48
49 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
51 if let Some(f) = self.filter
52 && !FilterEvaluator::new(&entity).eval(f)
53 {
54 return false;
55 }
56
57 if self.skipped < self.offset {
58 self.skipped += 1;
59 return false;
60 }
61
62 if self.limit_reached() {
63 return true;
64 }
65
66 self.matches.push((dk, entity));
67 false
68 }
69}
70
71#[derive(Clone, Copy)]
76pub struct DeleteExecutor<E: EntityKind> {
77 db: Db<E::Canister>,
78 debug: bool,
79 _marker: PhantomData<E>,
80}
81
82impl<E: EntityKind> DeleteExecutor<E> {
83 #[must_use]
84 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
85 Self {
86 db,
87 debug,
88 _marker: PhantomData,
89 }
90 }
91
92 #[must_use]
94 pub const fn debug(mut self) -> Self {
95 self.debug = true;
96 self
97 }
98
99 pub fn one(self, value: impl FieldValue) -> Result<Response<E>, Error> {
105 let query = DeleteQuery::new().one::<E>(value);
106 self.execute(query)
107 }
108
109 pub fn only(self) -> Result<Response<E>, Error> {
111 let query = DeleteQuery::new().one::<E>(());
112 self.execute(query)
113 }
114
115 pub fn many(
117 self,
118 values: impl IntoIterator<Item = impl FieldValue>,
119 ) -> Result<Response<E>, Error> {
120 let query = DeleteQuery::new().many::<E>(values);
121 self.execute(query)
122 }
123
124 pub fn all(self) -> Result<Response<E>, Error> {
126 let query = DeleteQuery::new();
127 self.execute(query)
128 }
129
130 pub fn filter<F, I>(self, f: F) -> Result<Response<E>, Error>
132 where
133 F: FnOnce(FilterDsl) -> I,
134 I: IntoFilterExpr,
135 {
136 let query = DeleteQuery::new().filter(f);
137 self.execute(query)
138 }
139
140 pub fn ensure_delete_one(self, pk: impl FieldValue) -> Result<(), Error> {
145 self.one(pk)?.require_one()?;
146
147 Ok(())
148 }
149
150 pub fn ensure_delete_any(
151 self,
152 pks: impl IntoIterator<Item = impl FieldValue>,
153 ) -> Result<(), Error> {
154 self.many(pks)?.require_some()?;
155
156 Ok(())
157 }
158
159 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, Error> {
161 QueryValidate::<E>::validate(&query)?;
162
163 Ok(plan_for::<E>(query.filter.as_ref()))
164 }
165
166 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, Error> {
168 QueryValidate::<E>::validate(&query)?;
169 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Delete);
170
171 let plan = plan_for::<E>(query.filter.as_ref());
172
173 let limit = query
175 .limit
176 .as_ref()
177 .and_then(|l| l.limit)
178 .map(|l| l as usize);
179 let offset = query.limit.as_ref().map_or(0_usize, |l| l.offset as usize);
180 let filter_simplified = query.filter.as_ref().map(|f| f.clone().simplify());
181
182 let mut acc = DeleteAccumulator::new(filter_simplified.as_ref(), offset, limit);
183
184 scan_plan::<E, _>(&self.db, plan, |dk, entity| {
185 if acc.should_stop(dk, entity) {
186 ControlFlow::Break(())
187 } else {
188 ControlFlow::Continue(())
189 }
190 })?;
191
192 let mut res: Vec<(Key, E)> = Vec::with_capacity(acc.matches.len());
194 self.db.context::<E>().with_store_mut(|s| {
195 for (dk, entity) in acc.matches {
196 s.remove(&dk);
197 if !E::INDEXES.is_empty() {
198 self.remove_indexes(&entity)?;
199 }
200 res.push((dk.key(), entity));
201 }
202
203 Ok::<_, Error>(())
204 })??;
205
206 set_rows_from_len(&mut span, res.len());
207
208 Ok(Response(res))
209 }
210
211 fn remove_indexes(&self, entity: &E) -> Result<(), Error> {
213 for index in E::INDEXES {
214 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
215
216 store.with_borrow_mut(|this| {
217 this.remove_index_entry(entity, index);
218 });
219 }
220
221 Ok(())
222 }
223}