1use crate::{
2 db::{
3 CommitDataOp, CommitIndexOp, CommitKind, CommitMarker, Db, WriteUnit, begin_commit,
4 ensure_recovered,
5 executor::{
6 Context, ExecutorError, UniqueIndexHandle,
7 plan::{record_plan_metrics, set_rows_from_len},
8 resolve_unique_pk,
9 trace::{
10 QueryTraceSink, TraceAccess, TraceExecutorKind, start_exec_trace, start_plan_trace,
11 },
12 },
13 finish_commit,
14 index::{
15 IndexEntry, IndexEntryCorruption, IndexKey, IndexStore, MAX_INDEX_ENTRY_BYTES,
16 RawIndexEntry, RawIndexKey,
17 },
18 query::plan::ExecutablePlan,
19 response::Response,
20 store::{DataKey, DataRow, RawDataKey, RawRow},
21 traits::FromKey,
22 },
23 error::{ErrorClass, ErrorOrigin, InternalError},
24 obs::sink::{self, ExecKind, MetricsEvent, Span},
25 prelude::*,
26 sanitize::sanitize,
27 traits::{EntityKind, Path, Storable},
28};
29use std::{
30 borrow::Cow, cell::RefCell, collections::BTreeMap, marker::PhantomData, thread::LocalKey,
31};
32
33struct IndexPlan {
39 index: &'static IndexModel,
40 store: &'static LocalKey<RefCell<IndexStore>>,
41}
42
43struct PreparedIndexRollback {
45 store: &'static LocalKey<RefCell<IndexStore>>,
46 key: RawIndexKey,
47 value: Option<RawIndexEntry>,
48}
49
50struct PreparedDataRollback {
52 key: RawDataKey,
53 value: RawRow,
54}
55
56struct DeleteRow<E> {
58 key: DataKey,
59 raw: Option<RawRow>,
60 entity: E,
61}
62
63impl<E: EntityKind> crate::db::query::plan::logical::PlanRow<E> for DeleteRow<E> {
64 fn entity(&self) -> &E {
65 &self.entity
66 }
67}
68
69#[derive(Clone, Copy)]
78pub struct DeleteExecutor<E: EntityKind> {
79 db: Db<E::Canister>,
80 debug: bool,
81 trace: Option<&'static dyn QueryTraceSink>,
82 _marker: PhantomData<E>,
83}
84
85impl<E: EntityKind> DeleteExecutor<E> {
86 #[must_use]
87 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
88 Self {
89 db,
90 debug,
91 trace: None,
92 _marker: PhantomData,
93 }
94 }
95
96 #[must_use]
97 #[allow(dead_code)]
98 pub(crate) const fn with_trace_sink(
99 mut self,
100 sink: Option<&'static dyn QueryTraceSink>,
101 ) -> Self {
102 self.trace = sink;
103 self
104 }
105
106 #[must_use]
107 pub const fn debug(mut self) -> Self {
108 self.debug = true;
109 self
110 }
111
112 fn debug_log(&self, s: impl Into<String>) {
113 if self.debug {
114 println!("{}", s.into());
115 }
116 }
117
118 pub fn by_unique_index(
123 self,
124 index: UniqueIndexHandle,
125 entity: E,
126 ) -> Result<Response<E>, InternalError>
127 where
128 E::PrimaryKey: FromKey,
129 {
130 let trace = start_exec_trace(
131 self.trace,
132 TraceExecutorKind::Delete,
133 E::PATH,
134 Some(TraceAccess::UniqueIndex {
135 name: index.index().name,
136 }),
137 Some(index.index().name),
138 );
139 let result = (|| {
140 self.debug_log(format!(
141 "[debug] delete by unique index on {} ({})",
142 E::PATH,
143 index.index().fields.join(", ")
144 ));
145 let mut span = Span::<E>::new(ExecKind::Delete);
146 ensure_recovered(&self.db)?;
147
148 let index = index.index();
149 let mut lookup = entity;
150 sanitize(&mut lookup)?;
151
152 let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
154 set_rows_from_len(&mut span, 0);
155 return Ok(Response(Vec::new()));
156 };
157
158 let (dk, stored_row, stored) = self.load_existing(pk)?;
160 let ctx = self.db.context::<E>();
161 let index_plans = self.build_index_plans()?;
162 let (index_ops, index_remove_count) =
163 Self::build_index_removal_ops(&index_plans, &[&stored])?;
164
165 ctx.with_store(|_| ())?;
167
168 let raw_key = dk.to_raw()?;
169 let marker = CommitMarker::new(
170 CommitKind::Delete,
171 index_ops,
172 vec![CommitDataOp {
173 store: E::Store::PATH.to_string(),
174 key: raw_key.as_bytes().to_vec(),
175 value: None,
176 }],
177 )?;
178 let (index_apply_stores, index_rollback_ops) =
179 Self::prepare_index_delete_ops(&index_plans, &marker.index_ops)?;
180 let mut rollback_rows = BTreeMap::new();
181 rollback_rows.insert(raw_key, stored_row);
182 let data_rollback_ops =
183 Self::prepare_data_delete_ops(&marker.data_ops, &rollback_rows)?;
184 let commit = begin_commit(marker)?;
185
186 finish_commit(commit, |guard| {
187 let mut unit = WriteUnit::new("delete_unique_row_atomic");
188
189 let index_rollback_ops = index_rollback_ops;
191 unit.record_rollback(move || Self::apply_index_rollbacks(index_rollback_ops));
192 Self::apply_marker_index_ops(&guard.marker.index_ops, index_apply_stores);
193 for _ in 0..index_remove_count {
194 sink::record(MetricsEvent::IndexRemove {
195 entity_path: E::PATH,
196 });
197 }
198
199 unit.checkpoint("delete_unique_after_indexes")?;
200
201 let data_rollback_ops = data_rollback_ops;
203 let db = self.db;
204 unit.record_rollback(move || Self::apply_data_rollbacks(db, data_rollback_ops));
205 unit.run(|| Self::apply_marker_data_ops(&guard.marker.data_ops, &ctx))?;
206
207 unit.checkpoint("delete_unique_after_data")?;
208 unit.commit();
209 Ok(())
210 })?;
211
212 set_rows_from_len(&mut span, 1);
213 Ok(Response(vec![(dk.key(), stored)]))
214 })();
215
216 if let Some(trace) = trace {
217 match &result {
218 Ok(resp) => trace.finish(u64::try_from(resp.0.len()).unwrap_or(u64::MAX)),
219 Err(err) => trace.error(err),
220 }
221 }
222
223 result
224 }
225
226 #[allow(clippy::too_many_lines)]
231 pub fn execute(self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
232 let trace = start_plan_trace(self.trace, TraceExecutorKind::Delete, &plan);
233 let result = (|| {
234 let plan = plan.into_inner();
235 ensure_recovered(&self.db)?;
236
237 self.debug_log(format!("[debug] delete plan on {}", E::PATH));
238
239 let mut span = Span::<E>::new(ExecKind::Delete);
240 record_plan_metrics(&plan.access);
241
242 let ctx = self.db.context::<E>();
243 let data_rows = ctx.rows_from_access_plan(&plan.access, plan.consistency)?;
244 sink::record(MetricsEvent::RowsScanned {
245 entity_path: E::PATH,
246 rows_scanned: data_rows.len() as u64,
247 });
248
249 let mut rows = decode_rows::<E>(data_rows)?;
250
251 let stats = plan.apply_post_access::<E, _>(&mut rows)?;
252 if stats.delete_limited {
253 self.debug_log(format!(
254 "applied delete limit -> {} entities selected",
255 rows.len()
256 ));
257 }
258
259 if rows.is_empty() {
260 set_rows_from_len(&mut span, 0);
261 return Ok(Response(Vec::new()));
262 }
263
264 let index_plans = self.build_index_plans()?;
265 let (index_ops, index_remove_count) = {
266 let entities: Vec<&E> = rows.iter().map(|row| &row.entity).collect();
267 Self::build_index_removal_ops(&index_plans, &entities)?
268 };
269
270 ctx.with_store(|_| ())?;
272
273 let mut rollback_rows = BTreeMap::new();
274 let data_ops = rows
275 .iter_mut()
276 .map(|row| {
277 let raw_key = row.key.to_raw()?;
278 let raw_row = row.raw.take().ok_or_else(|| {
279 InternalError::new(
280 ErrorClass::Internal,
281 ErrorOrigin::Store,
282 "missing raw row for delete rollback".to_string(),
283 )
284 })?;
285 rollback_rows.insert(raw_key, raw_row);
286 Ok(CommitDataOp {
287 store: E::Store::PATH.to_string(),
288 key: raw_key.as_bytes().to_vec(),
289 value: None,
290 })
291 })
292 .collect::<Result<Vec<_>, InternalError>>()?;
293
294 let marker = CommitMarker::new(CommitKind::Delete, index_ops, data_ops)?;
295 let (index_apply_stores, index_rollback_ops) =
296 Self::prepare_index_delete_ops(&index_plans, &marker.index_ops)?;
297 let data_rollback_ops =
298 Self::prepare_data_delete_ops(&marker.data_ops, &rollback_rows)?;
299 let commit = begin_commit(marker)?;
300
301 finish_commit(commit, |guard| {
302 let mut unit = WriteUnit::new("delete_rows_atomic");
303
304 let index_rollback_ops = index_rollback_ops;
306 unit.record_rollback(move || Self::apply_index_rollbacks(index_rollback_ops));
307 Self::apply_marker_index_ops(&guard.marker.index_ops, index_apply_stores);
308 for _ in 0..index_remove_count {
309 sink::record(MetricsEvent::IndexRemove {
310 entity_path: E::PATH,
311 });
312 }
313
314 unit.checkpoint("delete_after_indexes")?;
315
316 let data_rollback_ops = data_rollback_ops;
318 let db = self.db;
319 unit.record_rollback(move || Self::apply_data_rollbacks(db, data_rollback_ops));
320 unit.run(|| Self::apply_marker_data_ops(&guard.marker.data_ops, &ctx))?;
321
322 unit.checkpoint("delete_after_data")?;
323 unit.commit();
324
325 Ok(())
326 })?;
327
328 let res = rows
329 .into_iter()
330 .map(|row| (row.key.key(), row.entity))
331 .collect::<Vec<_>>();
332 set_rows_from_len(&mut span, res.len());
333
334 Ok(Response(res))
335 })();
336
337 if let Some(trace) = trace {
338 match &result {
339 Ok(resp) => trace.finish(u64::try_from(resp.0.len()).unwrap_or(u64::MAX)),
340 Err(err) => trace.error(err),
341 }
342 }
343
344 result
345 }
346
347 #[expect(clippy::type_complexity)]
353 fn prepare_index_delete_ops(
354 plans: &[IndexPlan],
355 ops: &[CommitIndexOp],
356 ) -> Result<
357 (
358 Vec<&'static LocalKey<RefCell<IndexStore>>>,
359 Vec<PreparedIndexRollback>,
360 ),
361 InternalError,
362 > {
363 let mut stores = BTreeMap::new();
365 for plan in plans {
366 stores.insert(plan.index.store, plan.store);
367 }
368
369 let mut apply_stores = Vec::with_capacity(ops.len());
370 let mut rollbacks = Vec::with_capacity(ops.len());
371
372 for op in ops {
374 let store = stores.get(op.store.as_str()).ok_or_else(|| {
375 InternalError::new(
376 ErrorClass::Internal,
377 ErrorOrigin::Index,
378 format!(
379 "commit marker references unknown index store '{}' ({})",
380 op.store,
381 E::PATH
382 ),
383 )
384 })?;
385 if op.key.len() != IndexKey::STORED_SIZE as usize {
386 return Err(InternalError::new(
387 ErrorClass::Internal,
388 ErrorOrigin::Index,
389 format!(
390 "commit marker index key length {} does not match {} ({})",
391 op.key.len(),
392 IndexKey::STORED_SIZE,
393 E::PATH
394 ),
395 ));
396 }
397 if let Some(value) = &op.value
398 && value.len() > MAX_INDEX_ENTRY_BYTES as usize
399 {
400 return Err(InternalError::new(
401 ErrorClass::Internal,
402 ErrorOrigin::Index,
403 format!(
404 "commit marker index entry exceeds max size: {} bytes ({})",
405 value.len(),
406 E::PATH
407 ),
408 ));
409 }
410
411 let raw_key = RawIndexKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
412 let rollback_value = store.with_borrow(|s| s.get(&raw_key)).ok_or_else(|| {
413 InternalError::new(
414 ErrorClass::Internal,
415 ErrorOrigin::Index,
416 format!(
417 "commit marker index op missing entry before delete: {} ({})",
418 op.store,
419 E::PATH
420 ),
421 )
422 })?;
423
424 apply_stores.push(*store);
425 rollbacks.push(PreparedIndexRollback {
426 store,
427 key: raw_key,
428 value: Some(rollback_value),
429 });
430 }
431
432 Ok((apply_stores, rollbacks))
433 }
434
435 fn prepare_data_delete_ops(
437 ops: &[CommitDataOp],
438 rollback_rows: &BTreeMap<RawDataKey, RawRow>,
439 ) -> Result<Vec<PreparedDataRollback>, InternalError> {
440 let mut rollbacks = Vec::with_capacity(ops.len());
441
442 for op in ops {
444 if op.store != E::Store::PATH {
445 return Err(InternalError::new(
446 ErrorClass::Internal,
447 ErrorOrigin::Store,
448 format!(
449 "commit marker references unexpected data store '{}' ({})",
450 op.store,
451 E::PATH
452 ),
453 ));
454 }
455 if op.key.len() != DataKey::STORED_SIZE as usize {
456 return Err(InternalError::new(
457 ErrorClass::Internal,
458 ErrorOrigin::Store,
459 format!(
460 "commit marker data key length {} does not match {} ({})",
461 op.key.len(),
462 DataKey::STORED_SIZE,
463 E::PATH
464 ),
465 ));
466 }
467 if op.value.is_some() {
468 return Err(InternalError::new(
469 ErrorClass::Internal,
470 ErrorOrigin::Store,
471 format!("commit marker delete includes data payload ({})", E::PATH),
472 ));
473 }
474
475 let raw_key = RawDataKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
476 let raw_row = rollback_rows.get(&raw_key).ok_or_else(|| {
477 InternalError::new(
478 ErrorClass::Internal,
479 ErrorOrigin::Store,
480 format!("commit marker data op missing rollback row ({})", E::PATH),
481 )
482 })?;
483 rollbacks.push(PreparedDataRollback {
484 key: raw_key,
485 value: raw_row.clone(),
486 });
487 }
488
489 Ok(rollbacks)
490 }
491
492 fn apply_marker_index_ops(
494 ops: &[CommitIndexOp],
495 stores: Vec<&'static LocalKey<RefCell<IndexStore>>>,
496 ) {
497 debug_assert_eq!(
498 ops.len(),
499 stores.len(),
500 "commit marker index ops length mismatch"
501 );
502
503 for (op, store) in ops.iter().zip(stores.into_iter()) {
504 debug_assert_eq!(op.key.len(), IndexKey::STORED_SIZE as usize);
505 let raw_key = RawIndexKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
506
507 store.with_borrow_mut(|s| {
508 if let Some(value) = &op.value {
509 debug_assert!(value.len() <= MAX_INDEX_ENTRY_BYTES as usize);
510 let raw_entry = RawIndexEntry::from_bytes(Cow::Borrowed(value.as_slice()));
511 s.insert(raw_key, raw_entry);
512 } else {
513 s.remove(&raw_key);
514 }
515 });
516 }
517 }
518
519 fn apply_index_rollbacks(ops: Vec<PreparedIndexRollback>) {
521 for op in ops {
522 op.store.with_borrow_mut(|s| {
523 if let Some(value) = op.value {
524 s.insert(op.key, value);
525 } else {
526 s.remove(&op.key);
527 }
528 });
529 }
530 }
531
532 fn apply_marker_data_ops(
534 ops: &[CommitDataOp],
535 ctx: &Context<'_, E>,
536 ) -> Result<(), InternalError> {
537 for op in ops {
538 debug_assert!(op.value.is_none());
539 let raw_key = RawDataKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
540 ctx.with_store_mut(|s| s.remove(&raw_key))?;
541 }
542 Ok(())
543 }
544
545 fn apply_data_rollbacks(db: Db<E::Canister>, ops: Vec<PreparedDataRollback>) {
547 let ctx = db.context::<E>();
548 for op in ops {
549 let _ = ctx.with_store_mut(|s| s.insert(op.key, op.value));
550 }
551 }
552
553 fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, RawRow, E), InternalError> {
554 let dk = DataKey::new::<E>(pk.into());
555 let row = self.db.context::<E>().read_strict(&dk)?;
556 let entity = row.try_decode::<E>().map_err(|err| {
557 ExecutorError::corruption(
558 ErrorOrigin::Serialize,
559 format!("failed to deserialize row: {dk} ({err})"),
560 )
561 })?;
562 Ok((dk, row, entity))
563 }
564
565 fn build_index_plans(&self) -> Result<Vec<IndexPlan>, InternalError> {
566 E::INDEXES
567 .iter()
568 .map(|index| {
569 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
570 Ok(IndexPlan { index, store })
571 })
572 .collect()
573 }
574
575 #[expect(clippy::too_many_lines)]
577 fn build_index_removal_ops(
578 plans: &[IndexPlan],
579 entities: &[&E],
580 ) -> Result<(Vec<CommitIndexOp>, usize), InternalError> {
581 let mut ops = Vec::new();
582 let mut removed = 0usize;
583
584 for plan in plans {
586 let fields = plan.index.fields.join(", ");
587
588 let mut entries: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
590
591 for entity in entities {
593 let Some(key) = IndexKey::new(*entity, plan.index)? else {
594 continue;
595 };
596 let raw_key = key.to_raw();
597 let entity_key = entity.key();
598
599 let entry = match entries.entry(raw_key) {
601 std::collections::btree_map::Entry::Vacant(slot) => {
602 let decoded = plan
603 .store
604 .with_borrow(|s| s.get(&raw_key))
605 .map(|raw| {
606 raw.try_decode().map_err(|err| {
607 ExecutorError::corruption(
608 ErrorOrigin::Index,
609 format!(
610 "index corrupted: {} ({}) -> {}",
611 E::PATH,
612 fields,
613 err
614 ),
615 )
616 })
617 })
618 .transpose()?;
619 slot.insert(decoded)
620 }
621 std::collections::btree_map::Entry::Occupied(slot) => slot.into_mut(),
622 };
623
624 let Some(e) = entry.as_ref() else {
626 return Err(ExecutorError::corruption(
627 ErrorOrigin::Index,
628 format!(
629 "index corrupted: {} ({}) -> {}",
630 E::PATH,
631 fields,
632 IndexEntryCorruption::missing_key(raw_key, entity_key),
633 ),
634 )
635 .into());
636 };
637
638 if plan.index.unique && e.len() > 1 {
639 return Err(ExecutorError::corruption(
640 ErrorOrigin::Index,
641 format!(
642 "index corrupted: {} ({}) -> {}",
643 E::PATH,
644 fields,
645 IndexEntryCorruption::NonUniqueEntry { keys: e.len() },
646 ),
647 )
648 .into());
649 }
650
651 if !e.contains(&entity_key) {
652 return Err(ExecutorError::corruption(
653 ErrorOrigin::Index,
654 format!(
655 "index corrupted: {} ({}) -> {}",
656 E::PATH,
657 fields,
658 IndexEntryCorruption::missing_key(raw_key, entity_key),
659 ),
660 )
661 .into());
662 }
663 removed = removed.saturating_add(1);
664
665 if let Some(e) = entry.as_mut() {
667 e.remove_key(&entity_key);
668 if e.is_empty() {
669 *entry = None;
670 }
671 }
672 }
673
674 for (raw_key, entry) in entries {
676 let value = if let Some(entry) = entry {
677 let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| match err {
678 crate::db::index::entry::IndexEntryEncodeError::TooManyKeys { keys } => {
679 InternalError::new(
680 ErrorClass::Corruption,
681 ErrorOrigin::Index,
682 format!(
683 "index corrupted: {} ({}) -> {}",
684 E::PATH,
685 fields,
686 IndexEntryCorruption::TooManyKeys { count: keys }
687 ),
688 )
689 }
690 crate::db::index::entry::IndexEntryEncodeError::KeyEncoding(err) => {
691 InternalError::new(
692 ErrorClass::Unsupported,
693 ErrorOrigin::Index,
694 format!(
695 "index key encoding failed: {} ({fields}) -> {err}",
696 E::PATH
697 ),
698 )
699 }
700 })?;
701 Some(raw.as_bytes().to_vec())
702 } else {
703 None
705 };
706
707 ops.push(CommitIndexOp {
708 store: plan.index.store.to_string(),
709 key: raw_key.as_bytes().to_vec(),
710 value,
711 });
712 }
713 }
714
715 Ok((ops, removed))
716 }
717}
718
719fn decode_rows<E: EntityKind>(rows: Vec<DataRow>) -> Result<Vec<DeleteRow<E>>, InternalError> {
720 rows.into_iter()
721 .map(|(dk, raw)| {
722 let dk_for_err = dk.clone();
723 let entity = raw.try_decode::<E>().map_err(|err| {
724 ExecutorError::corruption(
725 ErrorOrigin::Serialize,
726 format!("failed to deserialize row: {dk_for_err} ({err})"),
727 )
728 })?;
729
730 let expected = dk.key();
731 let actual = entity.key();
732 if expected != actual {
733 return Err(ExecutorError::corruption(
734 ErrorOrigin::Store,
735 format!("row key mismatch: expected {expected}, found {actual}"),
736 )
737 .into());
738 }
739
740 Ok(DeleteRow {
741 key: dk,
742 raw: Some(raw),
743 entity,
744 })
745 })
746 .collect()
747}