1use crate::{
2 db::{
3 CommitDataOp, CommitIndexOp, CommitKind, CommitMarker, Db, begin_commit, ensure_recovered,
4 executor::{
5 ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
6 plan::{
7 plan_for, record_plan_metrics, scan_missing_ok, scan_strict, set_rows_from_len,
8 },
9 resolve_unique_pk,
10 },
11 finish_commit,
12 index::{
13 IndexEntry, IndexEntryCorruption, IndexKey, IndexRemoveOutcome, IndexStore,
14 RawIndexEntry, RawIndexKey,
15 },
16 primitives::FilterExpr,
17 query::{DeleteQuery, QueryPlan, QueryValidate},
18 response::Response,
19 store::DataKey,
20 traits::FromKey,
21 },
22 error::{ErrorOrigin, InternalError},
23 obs::sink::{self, ExecKind, MetricsEvent, Span},
24 prelude::*,
25 sanitize::sanitize,
26 traits::{EntityKind, FieldValue, Path},
27};
28use canic_cdk::structures::Storable;
29use std::{
30 cell::RefCell, collections::BTreeMap, marker::PhantomData, ops::ControlFlow, thread::LocalKey,
31};
32
33struct DeleteAccumulator<'f, E> {
39 filter: Option<&'f FilterExpr>,
40 offset: usize,
41 skipped: usize,
42 limit: Option<usize>,
43 matches: Vec<(DataKey, E)>,
44}
45
46impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
47 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
48 Self {
49 filter,
50 offset,
51 skipped: 0,
52 limit,
53 matches: Vec::with_capacity(limit.unwrap_or(0)),
54 }
55 }
56
57 fn limit_reached(&self) -> bool {
58 self.limit.is_some_and(|lim| self.matches.len() >= lim)
59 }
60
61 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
62 if let Some(f) = self.filter
63 && !FilterEvaluator::new(&entity).eval(f)
64 {
65 return false;
66 }
67
68 if self.skipped < self.offset {
69 self.skipped += 1;
70 return false;
71 }
72
73 if self.limit_reached() {
74 return true;
75 }
76
77 self.matches.push((dk, entity));
78 false
79 }
80}
81
82struct IndexPlan {
88 index: &'static IndexModel,
89 store: &'static LocalKey<RefCell<IndexStore>>,
90}
91
92#[derive(Clone, Copy)]
101pub struct DeleteExecutor<E: EntityKind> {
102 db: Db<E::Canister>,
103 debug: bool,
104 _marker: PhantomData<E>,
105}
106
107impl<E: EntityKind> DeleteExecutor<E> {
108 #[must_use]
109 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
110 Self {
111 db,
112 debug,
113 _marker: PhantomData,
114 }
115 }
116
117 #[must_use]
118 pub const fn debug(mut self) -> Self {
119 self.debug = true;
120 self
121 }
122
123 fn debug_log(&self, s: impl Into<String>) {
124 if self.debug {
125 println!("{}", s.into());
126 }
127 }
128
129 pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, InternalError> {
134 self.execute(DeleteQuery::new().one::<E>(pk))
135 }
136
137 pub fn only(self) -> Result<Response<E>, InternalError> {
138 self.execute(DeleteQuery::new().one::<E>(()))
139 }
140
141 pub fn many<I, V>(self, values: I) -> Result<Response<E>, InternalError>
142 where
143 I: IntoIterator<Item = V>,
144 V: FieldValue,
145 {
146 self.execute(DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values))
147 }
148
149 pub fn by_unique_index(
154 self,
155 index: UniqueIndexHandle,
156 entity: E,
157 ) -> Result<Response<E>, InternalError>
158 where
159 E::PrimaryKey: FromKey,
160 {
161 self.debug_log(format!(
162 "[debug] delete by unique index on {} ({})",
163 E::PATH,
164 index.index().fields.join(", ")
165 ));
166 let mut span = Span::<E>::new(ExecKind::Delete);
167 ensure_recovered(&self.db)?;
168
169 let index = index.index();
170 let mut lookup = entity;
171 sanitize(&mut lookup)?;
172
173 let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
175 set_rows_from_len(&mut span, 0);
176 return Ok(Response(Vec::new()));
177 };
178
179 let (dk, stored) = self.load_existing(pk)?;
180 let ctx = self.db.context::<E>();
181 let index_plans = self.build_index_plans()?;
182 let index_ops = Self::build_index_removal_ops(&index_plans, &[&stored])?;
183
184 ctx.with_store(|_| ())?;
186
187 let marker = CommitMarker::new(
188 CommitKind::Delete,
189 index_ops,
190 vec![CommitDataOp {
191 store: E::Store::PATH.to_string(),
192 key: dk.to_raw().as_bytes().to_vec(),
193 value: None,
194 }],
195 )?;
196 let commit = begin_commit(marker)?;
197
198 finish_commit(
199 commit,
200 || {
201 let _unit = WriteUnit::new("delete_unique_row_stage1_atomic");
202 for plan in &index_plans {
203 let outcome = plan
204 .store
205 .with_borrow_mut(|s| s.remove_index_entry(&stored, plan.index))
206 .expect("index remove failed after prevalidation");
207 if outcome == IndexRemoveOutcome::Removed {
208 sink::record(MetricsEvent::IndexRemove {
209 entity_path: E::PATH,
210 });
211 }
212 }
213 },
214 || {
215 ctx.with_store_mut(|s| s.remove(&dk.to_raw()))
216 .expect("data store missing after preflight");
217 },
218 );
219
220 set_rows_from_len(&mut span, 1);
221 Ok(Response(vec![(dk.key(), stored)]))
222 }
223
224 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, InternalError> {
229 QueryValidate::<E>::validate(&query)?;
230 Ok(plan_for::<E>(query.filter.as_ref()))
231 }
232
233 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, InternalError> {
234 QueryValidate::<E>::validate(&query)?;
236 ensure_recovered(&self.db)?;
237
238 self.debug_log(format!("[debug] delete query {:?} on {}", query, E::PATH));
239
240 let mut span = Span::<E>::new(ExecKind::Delete);
241
242 let plan = plan_for::<E>(query.filter.as_ref());
244 record_plan_metrics(&plan);
245
246 self.debug_log(format!("[debug] delete plan: {plan:?}"));
247
248 let (limit, offset) = match query.limit.as_ref() {
250 Some(l) => (l.limit.map(|v| v as usize), l.offset as usize),
251 None => (None, 0),
252 };
253
254 let filter = query.filter.as_ref().map(|f| f.clone().simplify());
256 let mut acc = DeleteAccumulator::new(filter.as_ref(), offset, limit);
257 let mut scanned = 0u64;
258
259 match plan {
261 QueryPlan::Keys(keys) => {
262 scan_missing_ok::<E, _>(&self.db, QueryPlan::Keys(keys), |dk, entity| {
264 scanned += 1;
265 if acc.should_stop(dk, entity) {
266 ControlFlow::Break(())
267 } else {
268 ControlFlow::Continue(())
269 }
270 })?;
271 }
272 plan => {
273 scan_strict::<E, _>(&self.db, plan, |dk, entity| {
275 scanned += 1;
276 if acc.should_stop(dk, entity) {
277 ControlFlow::Break(())
278 } else {
279 ControlFlow::Continue(())
280 }
281 })?;
282 }
283 }
284
285 sink::record(MetricsEvent::RowsScanned {
287 entity_path: E::PATH,
288 rows_scanned: scanned,
289 });
290
291 if acc.matches.is_empty() {
293 set_rows_from_len(&mut span, 0);
294 return Ok(Response(Vec::new()));
295 }
296
297 let index_plans = self.build_index_plans()?;
299 let entities: Vec<&E> = acc.matches.iter().map(|(_, e)| e).collect();
300 let index_ops = Self::build_index_removal_ops(&index_plans, &entities)?;
301
302 let ctx = self.db.context::<E>();
304 ctx.with_store(|_| ())?;
305
306 let data_ops = acc
307 .matches
308 .iter()
309 .map(|(dk, _)| CommitDataOp {
310 store: E::Store::PATH.to_string(),
311 key: dk.to_raw().as_bytes().to_vec(),
312 value: None,
313 })
314 .collect();
315
316 let marker = CommitMarker::new(CommitKind::Delete, index_ops, data_ops)?;
317 let commit = begin_commit(marker)?;
318
319 finish_commit(
321 commit,
322 || {
323 for (_, entity) in &acc.matches {
324 let _unit = WriteUnit::new("delete_row_stage1_atomic");
325 for plan in &index_plans {
326 let outcome = plan
327 .store
328 .with_borrow_mut(|s| s.remove_index_entry(entity, plan.index))
329 .expect("index remove failed after prevalidation");
330 if outcome == IndexRemoveOutcome::Removed {
331 sink::record(MetricsEvent::IndexRemove {
332 entity_path: E::PATH,
333 });
334 }
335 }
336 }
337 },
338 || {
339 ctx.with_store_mut(|s| {
340 for (dk, _) in &acc.matches {
341 s.remove(&dk.to_raw());
342 }
343 })
344 .expect("data store missing after preflight");
345 },
346 );
347
348 let res = acc
350 .matches
351 .into_iter()
352 .map(|(dk, e)| (dk.key(), e))
353 .collect::<Vec<_>>();
354 set_rows_from_len(&mut span, res.len());
355
356 Ok(Response(res))
357 }
358
359 fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, E), InternalError> {
364 let dk = DataKey::new::<E>(pk.into());
365 let row = self.db.context::<E>().read_strict(&dk)?;
366 let entity = row.try_decode::<E>().map_err(|err| {
367 ExecutorError::corruption(
368 ErrorOrigin::Serialize,
369 format!("failed to deserialize row: {dk} ({err})"),
370 )
371 })?;
372 Ok((dk, entity))
373 }
374
375 fn build_index_plans(&self) -> Result<Vec<IndexPlan>, InternalError> {
376 E::INDEXES
377 .iter()
378 .map(|index| {
379 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
380 Ok(IndexPlan { index, store })
381 })
382 .collect()
383 }
384
385 fn build_index_removal_ops(
386 plans: &[IndexPlan],
387 entities: &[&E],
388 ) -> Result<Vec<CommitIndexOp>, InternalError> {
389 let mut ops = Vec::new();
390
391 for plan in plans {
393 let fields = plan.index.fields.join(", ");
394
395 let mut entries: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
397
398 for entity in entities {
400 let Some(key) = IndexKey::new(*entity, plan.index) else {
401 continue;
402 };
403 let raw_key = key.to_raw();
404
405 let entry = match entries.entry(raw_key) {
407 std::collections::btree_map::Entry::Vacant(slot) => {
408 let decoded = plan
409 .store
410 .with_borrow(|s| s.get(&raw_key))
411 .map(|raw| {
412 raw.try_decode().map_err(|err| {
413 ExecutorError::corruption(
414 ErrorOrigin::Index,
415 format!(
416 "index corrupted: {} ({}) -> {}",
417 E::PATH,
418 fields,
419 err
420 ),
421 )
422 })
423 })
424 .transpose()?;
425 slot.insert(decoded)
426 }
427 std::collections::btree_map::Entry::Occupied(slot) => slot.into_mut(),
428 };
429
430 if let Some(e) = entry.as_mut() {
432 e.remove_key(&entity.key());
433 if e.is_empty() {
434 *entry = None;
435 }
436 }
437 }
438
439 for (raw_key, entry) in entries {
441 let value = if let Some(entry) = entry {
442 let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| {
443 ExecutorError::corruption(
444 ErrorOrigin::Index,
445 format!(
446 "index corrupted: {} ({}) -> {}",
447 E::PATH,
448 fields,
449 IndexEntryCorruption::TooManyKeys { count: err.keys() }
450 ),
451 )
452 })?;
453 Some(raw.into_bytes())
454 } else {
455 None
457 };
458
459 ops.push(CommitIndexOp {
460 store: plan.index.store.to_string(),
461 key: raw_key.as_bytes().to_vec(),
462 value,
463 });
464 }
465 }
466
467 Ok(ops)
468 }
469}