1use crate::{
2 db::{
3 CommitDataOp, CommitIndexOp, CommitKind, CommitMarker, Db, begin_commit, ensure_recovered,
4 executor::{
5 ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
6 plan::{plan_for, record_plan_metrics, scan_strict, set_rows_from_len},
7 resolve_unique_pk,
8 },
9 finish_commit,
10 index::{
11 IndexEntry, IndexEntryCorruption, IndexKey, IndexRemoveOutcome, IndexStore,
12 RawIndexEntry, RawIndexKey,
13 },
14 primitives::FilterExpr,
15 query::{DeleteQuery, QueryPlan, QueryValidate},
16 response::Response,
17 store::DataKey,
18 traits::FromKey,
19 },
20 error::{ErrorOrigin, InternalError},
21 obs::sink::{self, ExecKind, MetricsEvent, Span},
22 prelude::*,
23 sanitize::sanitize,
24 traits::{EntityKind, FieldValue, Path},
25};
26use canic_cdk::structures::Storable;
27use std::{
28 cell::RefCell, collections::BTreeMap, marker::PhantomData, ops::ControlFlow, thread::LocalKey,
29};
30
31struct DeleteAccumulator<'f, E> {
37 filter: Option<&'f FilterExpr>,
38 offset: usize,
39 skipped: usize,
40 limit: Option<usize>,
41 matches: Vec<(DataKey, E)>,
42}
43
44impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
45 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
46 Self {
47 filter,
48 offset,
49 skipped: 0,
50 limit,
51 matches: Vec::with_capacity(limit.unwrap_or(0)),
52 }
53 }
54
55 fn limit_reached(&self) -> bool {
56 self.limit.is_some_and(|lim| self.matches.len() >= lim)
57 }
58
59 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
60 if let Some(f) = self.filter
61 && !FilterEvaluator::new(&entity).eval(f)
62 {
63 return false;
64 }
65
66 if self.skipped < self.offset {
67 self.skipped += 1;
68 return false;
69 }
70
71 if self.limit_reached() {
72 return true;
73 }
74
75 self.matches.push((dk, entity));
76 false
77 }
78}
79
80struct IndexPlan {
86 index: &'static IndexModel,
87 store: &'static LocalKey<RefCell<IndexStore>>,
88}
89
90#[derive(Clone, Copy)]
99pub struct DeleteExecutor<E: EntityKind> {
100 db: Db<E::Canister>,
101 debug: bool,
102 _marker: PhantomData<E>,
103}
104
105impl<E: EntityKind> DeleteExecutor<E> {
106 #[must_use]
107 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
108 Self {
109 db,
110 debug,
111 _marker: PhantomData,
112 }
113 }
114
115 #[must_use]
116 pub const fn debug(mut self) -> Self {
117 self.debug = true;
118 self
119 }
120
121 pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, InternalError> {
126 self.execute(DeleteQuery::new().one::<E>(pk))
127 }
128
129 pub fn only(self) -> Result<Response<E>, InternalError> {
130 self.execute(DeleteQuery::new().one::<E>(()))
131 }
132
133 pub fn many<I, V>(self, values: I) -> Result<Response<E>, InternalError>
134 where
135 I: IntoIterator<Item = V>,
136 V: FieldValue,
137 {
138 self.execute(DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values))
139 }
140
141 pub fn by_unique_index(
146 self,
147 index: UniqueIndexHandle,
148 entity: E,
149 ) -> Result<Response<E>, InternalError>
150 where
151 E::PrimaryKey: FromKey,
152 {
153 let mut span = Span::<E>::new(ExecKind::Delete);
154 ensure_recovered(&self.db)?;
155
156 let index = index.index();
157 let mut lookup = entity;
158 sanitize(&mut lookup)?;
159
160 let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
161 set_rows_from_len(&mut span, 0);
162 return Ok(Response(Vec::new()));
163 };
164
165 let (dk, stored) = self.load_existing(pk)?;
166 let ctx = self.db.context::<E>();
167 let index_plans = self.build_index_plans()?;
168 let index_ops = Self::build_index_removal_ops(&index_plans, &[&stored])?;
169
170 ctx.with_store(|_| ())?;
171
172 let marker = CommitMarker::new(
173 CommitKind::Delete,
174 index_ops,
175 vec![CommitDataOp {
176 store: E::Store::PATH.to_string(),
177 key: dk.to_raw().as_bytes().to_vec(),
178 value: None,
179 }],
180 )?;
181 let commit = begin_commit(marker)?;
182
183 finish_commit(
184 commit,
185 || {
186 let _unit = WriteUnit::new("delete_unique_row_stage1_atomic");
187 for plan in &index_plans {
188 let outcome = plan
189 .store
190 .with_borrow_mut(|s| s.remove_index_entry(&stored, plan.index))
191 .expect("index remove failed after prevalidation");
192 if outcome == IndexRemoveOutcome::Removed {
193 sink::record(MetricsEvent::IndexRemove {
194 entity_path: E::PATH,
195 });
196 }
197 }
198 },
199 || {
200 ctx.with_store_mut(|s| s.remove(&dk.to_raw()))
201 .expect("data store missing after preflight");
202 },
203 );
204
205 set_rows_from_len(&mut span, 1);
206 Ok(Response(vec![(dk.key(), stored)]))
207 }
208
209 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, InternalError> {
214 QueryValidate::<E>::validate(&query)?;
215 Ok(plan_for::<E>(query.filter.as_ref()))
216 }
217
218 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, InternalError> {
219 QueryValidate::<E>::validate(&query)?;
220 ensure_recovered(&self.db)?;
221
222 let mut span = Span::<E>::new(ExecKind::Delete);
223 let plan = plan_for::<E>(query.filter.as_ref());
224 record_plan_metrics(&plan);
225
226 let (limit, offset) = match query.limit.as_ref() {
227 Some(l) => (l.limit.map(|v| v as usize), l.offset as usize),
228 None => (None, 0),
229 };
230
231 let filter = query.filter.as_ref().map(|f| f.clone().simplify());
232 let mut acc = DeleteAccumulator::new(filter.as_ref(), offset, limit);
233 let mut scanned = 0u64;
234
235 scan_strict::<E, _>(&self.db, plan, |dk, entity| {
236 scanned += 1;
237 if acc.should_stop(dk, entity) {
238 ControlFlow::Break(())
239 } else {
240 ControlFlow::Continue(())
241 }
242 })?;
243
244 sink::record(MetricsEvent::RowsScanned {
245 entity_path: E::PATH,
246 rows_scanned: scanned,
247 });
248
249 if acc.matches.is_empty() {
250 set_rows_from_len(&mut span, 0);
251 return Ok(Response(Vec::new()));
252 }
253
254 let index_plans = self.build_index_plans()?;
255 let entities: Vec<&E> = acc.matches.iter().map(|(_, e)| e).collect();
256 let index_ops = Self::build_index_removal_ops(&index_plans, &entities)?;
257
258 let ctx = self.db.context::<E>();
259 ctx.with_store(|_| ())?;
260
261 let data_ops = acc
262 .matches
263 .iter()
264 .map(|(dk, _)| CommitDataOp {
265 store: E::Store::PATH.to_string(),
266 key: dk.to_raw().as_bytes().to_vec(),
267 value: None,
268 })
269 .collect();
270
271 let marker = CommitMarker::new(CommitKind::Delete, index_ops, data_ops)?;
272 let commit = begin_commit(marker)?;
273
274 finish_commit(
275 commit,
276 || {
277 for (_, entity) in &acc.matches {
278 let _unit = WriteUnit::new("delete_row_stage1_atomic");
279 for plan in &index_plans {
280 let outcome = plan
281 .store
282 .with_borrow_mut(|s| s.remove_index_entry(entity, plan.index))
283 .expect("index remove failed after prevalidation");
284 if outcome == IndexRemoveOutcome::Removed {
285 sink::record(MetricsEvent::IndexRemove {
286 entity_path: E::PATH,
287 });
288 }
289 }
290 }
291 },
292 || {
293 ctx.with_store_mut(|s| {
294 for (dk, _) in &acc.matches {
295 s.remove(&dk.to_raw());
296 }
297 })
298 .expect("data store missing after preflight");
299 },
300 );
301
302 let res = acc
303 .matches
304 .into_iter()
305 .map(|(dk, e)| (dk.key(), e))
306 .collect::<Vec<_>>();
307 set_rows_from_len(&mut span, res.len());
308
309 Ok(Response(res))
310 }
311
312 fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, E), InternalError> {
317 let dk = DataKey::new::<E>(pk.into());
318 let row = self.db.context::<E>().read_strict(&dk)?;
319 let entity = row.try_decode::<E>().map_err(|err| {
320 ExecutorError::corruption(
321 ErrorOrigin::Serialize,
322 format!("failed to deserialize row: {dk} ({err})"),
323 )
324 })?;
325 Ok((dk, entity))
326 }
327
328 fn build_index_plans(&self) -> Result<Vec<IndexPlan>, InternalError> {
329 E::INDEXES
330 .iter()
331 .map(|index| {
332 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
333 Ok(IndexPlan { index, store })
334 })
335 .collect()
336 }
337
338 fn build_index_removal_ops(
339 plans: &[IndexPlan],
340 entities: &[&E],
341 ) -> Result<Vec<CommitIndexOp>, InternalError> {
342 let mut ops = Vec::new();
343
344 for plan in plans {
345 let fields = plan.index.fields.join(", ");
346 let mut entries: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
347
348 for entity in entities {
349 let Some(key) = IndexKey::new(*entity, plan.index) else {
350 continue;
351 };
352 let raw_key = key.to_raw();
353
354 let entry = match entries.entry(raw_key) {
355 std::collections::btree_map::Entry::Vacant(slot) => {
356 let decoded = plan
357 .store
358 .with_borrow(|s| s.get(&raw_key))
359 .map(|raw| {
360 raw.try_decode().map_err(|err| {
361 ExecutorError::corruption(
362 ErrorOrigin::Index,
363 format!(
364 "index corrupted: {} ({}) -> {}",
365 E::PATH,
366 fields,
367 err
368 ),
369 )
370 })
371 })
372 .transpose()?;
373 slot.insert(decoded)
374 }
375 std::collections::btree_map::Entry::Occupied(slot) => slot.into_mut(),
376 };
377
378 if let Some(e) = entry.as_mut() {
379 e.remove_key(&entity.key());
380 if e.is_empty() {
381 *entry = None;
382 }
383 }
384 }
385
386 for (raw_key, entry) in entries {
387 let value = if let Some(entry) = entry {
388 let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| {
389 ExecutorError::corruption(
390 ErrorOrigin::Index,
391 format!(
392 "index corrupted: {} ({}) -> {}",
393 E::PATH,
394 fields,
395 IndexEntryCorruption::TooManyKeys { count: err.keys() }
396 ),
397 )
398 })?;
399 Some(raw.into_bytes())
400 } else {
401 None
402 };
403
404 ops.push(CommitIndexOp {
405 store: plan.index.store.to_string(),
406 key: raw_key.as_bytes().to_vec(),
407 value,
408 });
409 }
410 }
411
412 Ok(ops)
413 }
414}