1use crate::{
2 db::{
3 CommitDataOp, CommitIndexOp, CommitKind, CommitMarker, Db, WriteUnit, begin_commit,
4 ensure_recovered,
5 executor::{
6 Context, ExecutorError, UniqueIndexHandle,
7 plan::{record_plan_metrics, set_rows_from_len},
8 resolve_unique_pk,
9 trace::{
10 QueryTraceSink, TraceAccess, TraceExecutorKind, TracePhase, start_exec_trace,
11 start_plan_trace,
12 },
13 },
14 finish_commit,
15 index::{
16 IndexEntry, IndexEntryCorruption, IndexKey, IndexStore, MAX_INDEX_ENTRY_BYTES,
17 RawIndexEntry, RawIndexKey,
18 },
19 query::plan::ExecutablePlan,
20 response::Response,
21 store::{DataKey, DataRow, RawDataKey, RawRow},
22 traits::FromKey,
23 },
24 error::{ErrorClass, ErrorOrigin, InternalError},
25 obs::sink::{self, ExecKind, MetricsEvent, Span},
26 prelude::*,
27 sanitize::sanitize,
28 traits::{EntityKind, Path, Storable},
29};
30use std::{
31 borrow::Cow, cell::RefCell, collections::BTreeMap, marker::PhantomData, thread::LocalKey,
32};
33
34struct IndexPlan {
40 index: &'static IndexModel,
41 store: &'static LocalKey<RefCell<IndexStore>>,
42}
43
44struct PreparedIndexRollback {
46 store: &'static LocalKey<RefCell<IndexStore>>,
47 key: RawIndexKey,
48 value: Option<RawIndexEntry>,
49}
50
51struct PreparedDataRollback {
53 key: RawDataKey,
54 value: RawRow,
55}
56
57struct DeleteRow<E> {
59 key: DataKey,
60 raw: Option<RawRow>,
61 entity: E,
62}
63
64impl<E: EntityKind> crate::db::query::plan::logical::PlanRow<E> for DeleteRow<E> {
65 fn entity(&self) -> &E {
66 &self.entity
67 }
68}
69
70#[derive(Clone, Copy)]
79pub struct DeleteExecutor<E: EntityKind> {
80 db: Db<E::Canister>,
81 debug: bool,
82 trace: Option<&'static dyn QueryTraceSink>,
83 _marker: PhantomData<E>,
84}
85
86impl<E: EntityKind> DeleteExecutor<E> {
87 #[must_use]
88 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
89 Self {
90 db,
91 debug,
92 trace: None,
93 _marker: PhantomData,
94 }
95 }
96
97 #[must_use]
98 #[allow(dead_code)]
99 pub(crate) const fn with_trace_sink(
100 mut self,
101 sink: Option<&'static dyn QueryTraceSink>,
102 ) -> Self {
103 self.trace = sink;
104 self
105 }
106
107 #[must_use]
108 pub const fn debug(mut self) -> Self {
109 self.debug = true;
110 self
111 }
112
113 fn debug_log(&self, s: impl Into<String>) {
114 if self.debug {
115 println!("{}", s.into());
116 }
117 }
118
119 pub fn by_unique_index(
124 self,
125 index: UniqueIndexHandle,
126 entity: E,
127 ) -> Result<Response<E>, InternalError>
128 where
129 E::PrimaryKey: FromKey,
130 {
131 let trace = start_exec_trace(
132 self.trace,
133 TraceExecutorKind::Delete,
134 E::PATH,
135 Some(TraceAccess::UniqueIndex {
136 name: index.index().name,
137 }),
138 Some(index.index().name),
139 );
140 let result = (|| {
141 self.debug_log(format!(
142 "[debug] delete by unique index on {} ({})",
143 E::PATH,
144 index.index().fields.join(", ")
145 ));
146 let mut span = Span::<E>::new(ExecKind::Delete);
147 ensure_recovered(&self.db)?;
148
149 let index = index.index();
150 let mut lookup = entity;
151 sanitize(&mut lookup)?;
152
153 let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
155 set_rows_from_len(&mut span, 0);
156 return Ok(Response(Vec::new()));
157 };
158
159 let (dk, stored_row, stored) = self.load_existing(pk)?;
161 let ctx = self.db.context::<E>();
162 let index_plans = self.build_index_plans()?;
163 let (index_ops, index_remove_count) =
164 Self::build_index_removal_ops(&index_plans, &[&stored])?;
165
166 ctx.with_store(|_| ())?;
168
169 let raw_key = dk.to_raw()?;
170 let marker = CommitMarker::new(
171 CommitKind::Delete,
172 index_ops,
173 vec![CommitDataOp {
174 store: E::Store::PATH.to_string(),
175 key: raw_key.as_bytes().to_vec(),
176 value: None,
177 }],
178 )?;
179 let (index_apply_stores, index_rollback_ops) =
180 Self::prepare_index_delete_ops(&index_plans, &marker.index_ops)?;
181 let mut rollback_rows = BTreeMap::new();
182 rollback_rows.insert(raw_key, stored_row);
183 let data_rollback_ops =
184 Self::prepare_data_delete_ops(&marker.data_ops, &rollback_rows)?;
185 let commit = begin_commit(marker)?;
186
187 finish_commit(commit, |guard| {
188 let mut unit = WriteUnit::new("delete_unique_row_atomic");
189
190 let index_rollback_ops = index_rollback_ops;
192 unit.record_rollback(move || Self::apply_index_rollbacks(index_rollback_ops));
193 Self::apply_marker_index_ops(&guard.marker.index_ops, index_apply_stores);
194 for _ in 0..index_remove_count {
195 sink::record(MetricsEvent::IndexRemove {
196 entity_path: E::PATH,
197 });
198 }
199
200 unit.checkpoint("delete_unique_after_indexes")?;
201
202 let data_rollback_ops = data_rollback_ops;
204 let db = self.db;
205 unit.record_rollback(move || Self::apply_data_rollbacks(db, data_rollback_ops));
206 unit.run(|| Self::apply_marker_data_ops(&guard.marker.data_ops, &ctx))?;
207
208 unit.checkpoint("delete_unique_after_data")?;
209 unit.commit();
210 Ok(())
211 })?;
212
213 set_rows_from_len(&mut span, 1);
214 Ok(Response(vec![(dk.key(), stored)]))
215 })();
216
217 if let Some(trace) = trace {
218 match &result {
219 Ok(resp) => trace.finish(u64::try_from(resp.0.len()).unwrap_or(u64::MAX)),
220 Err(err) => trace.error(err),
221 }
222 }
223
224 result
225 }
226
227 #[allow(clippy::too_many_lines)]
232 pub fn execute(self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
233 if !plan.mode().is_delete() {
234 return Err(InternalError::new(
235 ErrorClass::Unsupported,
236 ErrorOrigin::Query,
237 "delete executor requires delete plans".to_string(),
238 ));
239 }
240 let trace = start_plan_trace(self.trace, TraceExecutorKind::Delete, &plan);
241 let result = (|| {
242 let plan = plan.into_inner();
243 ensure_recovered(&self.db)?;
244
245 self.debug_log(format!("[debug] delete plan on {}", E::PATH));
246
247 let mut span = Span::<E>::new(ExecKind::Delete);
248 record_plan_metrics(&plan.access);
249
250 let ctx = self.db.context::<E>();
251 let data_rows = ctx.rows_from_access_plan(&plan.access, plan.consistency)?;
253 sink::record(MetricsEvent::RowsScanned {
254 entity_path: E::PATH,
255 rows_scanned: data_rows.len() as u64,
256 });
257
258 let mut rows = decode_rows::<E>(data_rows)?;
260 let access_rows = rows.len();
261
262 let stats = plan.apply_post_access::<E, _>(&mut rows)?;
264 if stats.delete_limited {
265 self.debug_log(format!(
266 "applied delete limit -> {} entities selected",
267 rows.len()
268 ));
269 }
270
271 if rows.is_empty() {
272 if let Some(trace) = trace.as_ref() {
273 let to_u64 = |len| u64::try_from(len).unwrap_or(u64::MAX);
274 trace.phase(TracePhase::Access, to_u64(access_rows));
275 trace.phase(TracePhase::Filter, to_u64(stats.rows_after_filter));
276 trace.phase(TracePhase::Order, to_u64(stats.rows_after_order));
277 trace.phase(
278 TracePhase::DeleteLimit,
279 to_u64(stats.rows_after_delete_limit),
280 );
281 }
282 set_rows_from_len(&mut span, 0);
283 return Ok(Response(Vec::new()));
284 }
285
286 let index_plans = self.build_index_plans()?;
287 let (index_ops, index_remove_count) = {
288 let entities: Vec<&E> = rows.iter().map(|row| &row.entity).collect();
289 Self::build_index_removal_ops(&index_plans, &entities)?
290 };
291
292 ctx.with_store(|_| ())?;
294
295 let mut rollback_rows = BTreeMap::new();
296 let data_ops = rows
297 .iter_mut()
298 .map(|row| {
299 let raw_key = row.key.to_raw()?;
300 let raw_row = row.raw.take().ok_or_else(|| {
301 InternalError::new(
302 ErrorClass::Internal,
303 ErrorOrigin::Store,
304 "missing raw row for delete rollback".to_string(),
305 )
306 })?;
307 rollback_rows.insert(raw_key, raw_row);
308 Ok(CommitDataOp {
309 store: E::Store::PATH.to_string(),
310 key: raw_key.as_bytes().to_vec(),
311 value: None,
312 })
313 })
314 .collect::<Result<Vec<_>, InternalError>>()?;
315
316 let marker = CommitMarker::new(CommitKind::Delete, index_ops, data_ops)?;
317 let (index_apply_stores, index_rollback_ops) =
318 Self::prepare_index_delete_ops(&index_plans, &marker.index_ops)?;
319 let data_rollback_ops =
320 Self::prepare_data_delete_ops(&marker.data_ops, &rollback_rows)?;
321 let commit = begin_commit(marker)?;
322
323 finish_commit(commit, |guard| {
324 let mut unit = WriteUnit::new("delete_rows_atomic");
325
326 let index_rollback_ops = index_rollback_ops;
328 unit.record_rollback(move || Self::apply_index_rollbacks(index_rollback_ops));
329 Self::apply_marker_index_ops(&guard.marker.index_ops, index_apply_stores);
330 for _ in 0..index_remove_count {
331 sink::record(MetricsEvent::IndexRemove {
332 entity_path: E::PATH,
333 });
334 }
335
336 unit.checkpoint("delete_after_indexes")?;
337
338 let data_rollback_ops = data_rollback_ops;
340 let db = self.db;
341 unit.record_rollback(move || Self::apply_data_rollbacks(db, data_rollback_ops));
342 unit.run(|| Self::apply_marker_data_ops(&guard.marker.data_ops, &ctx))?;
343
344 unit.checkpoint("delete_after_data")?;
345 unit.commit();
346
347 Ok(())
348 })?;
349
350 if let Some(trace) = trace.as_ref() {
352 let to_u64 = |len| u64::try_from(len).unwrap_or(u64::MAX);
353 trace.phase(TracePhase::Access, to_u64(access_rows));
354 trace.phase(TracePhase::Filter, to_u64(stats.rows_after_filter));
355 trace.phase(TracePhase::Order, to_u64(stats.rows_after_order));
356 trace.phase(
357 TracePhase::DeleteLimit,
358 to_u64(stats.rows_after_delete_limit),
359 );
360 }
361
362 let res = rows
363 .into_iter()
364 .map(|row| (row.key.key(), row.entity))
365 .collect::<Vec<_>>();
366 set_rows_from_len(&mut span, res.len());
367
368 Ok(Response(res))
369 })();
370
371 if let Some(trace) = trace {
372 match &result {
373 Ok(resp) => trace.finish(u64::try_from(resp.0.len()).unwrap_or(u64::MAX)),
374 Err(err) => trace.error(err),
375 }
376 }
377
378 result
379 }
380
381 #[expect(clippy::type_complexity)]
387 fn prepare_index_delete_ops(
388 plans: &[IndexPlan],
389 ops: &[CommitIndexOp],
390 ) -> Result<
391 (
392 Vec<&'static LocalKey<RefCell<IndexStore>>>,
393 Vec<PreparedIndexRollback>,
394 ),
395 InternalError,
396 > {
397 let mut stores = BTreeMap::new();
399 for plan in plans {
400 stores.insert(plan.index.store, plan.store);
401 }
402
403 let mut apply_stores = Vec::with_capacity(ops.len());
404 let mut rollbacks = Vec::with_capacity(ops.len());
405
406 for op in ops {
408 let store = stores.get(op.store.as_str()).ok_or_else(|| {
409 InternalError::new(
410 ErrorClass::Internal,
411 ErrorOrigin::Index,
412 format!(
413 "commit marker references unknown index store '{}' ({})",
414 op.store,
415 E::PATH
416 ),
417 )
418 })?;
419 if op.key.len() != IndexKey::STORED_SIZE as usize {
420 return Err(InternalError::new(
421 ErrorClass::Internal,
422 ErrorOrigin::Index,
423 format!(
424 "commit marker index key length {} does not match {} ({})",
425 op.key.len(),
426 IndexKey::STORED_SIZE,
427 E::PATH
428 ),
429 ));
430 }
431 if let Some(value) = &op.value
432 && value.len() > MAX_INDEX_ENTRY_BYTES as usize
433 {
434 return Err(InternalError::new(
435 ErrorClass::Internal,
436 ErrorOrigin::Index,
437 format!(
438 "commit marker index entry exceeds max size: {} bytes ({})",
439 value.len(),
440 E::PATH
441 ),
442 ));
443 }
444
445 let raw_key = RawIndexKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
446 let rollback_value = store.with_borrow(|s| s.get(&raw_key)).ok_or_else(|| {
447 InternalError::new(
448 ErrorClass::Internal,
449 ErrorOrigin::Index,
450 format!(
451 "commit marker index op missing entry before delete: {} ({})",
452 op.store,
453 E::PATH
454 ),
455 )
456 })?;
457
458 apply_stores.push(*store);
459 rollbacks.push(PreparedIndexRollback {
460 store,
461 key: raw_key,
462 value: Some(rollback_value),
463 });
464 }
465
466 Ok((apply_stores, rollbacks))
467 }
468
469 fn prepare_data_delete_ops(
471 ops: &[CommitDataOp],
472 rollback_rows: &BTreeMap<RawDataKey, RawRow>,
473 ) -> Result<Vec<PreparedDataRollback>, InternalError> {
474 let mut rollbacks = Vec::with_capacity(ops.len());
475
476 for op in ops {
478 if op.store != E::Store::PATH {
479 return Err(InternalError::new(
480 ErrorClass::Internal,
481 ErrorOrigin::Store,
482 format!(
483 "commit marker references unexpected data store '{}' ({})",
484 op.store,
485 E::PATH
486 ),
487 ));
488 }
489 if op.key.len() != DataKey::STORED_SIZE as usize {
490 return Err(InternalError::new(
491 ErrorClass::Internal,
492 ErrorOrigin::Store,
493 format!(
494 "commit marker data key length {} does not match {} ({})",
495 op.key.len(),
496 DataKey::STORED_SIZE,
497 E::PATH
498 ),
499 ));
500 }
501 if op.value.is_some() {
502 return Err(InternalError::new(
503 ErrorClass::Internal,
504 ErrorOrigin::Store,
505 format!("commit marker delete includes data payload ({})", E::PATH),
506 ));
507 }
508
509 let raw_key = RawDataKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
510 let raw_row = rollback_rows.get(&raw_key).ok_or_else(|| {
511 InternalError::new(
512 ErrorClass::Internal,
513 ErrorOrigin::Store,
514 format!("commit marker data op missing rollback row ({})", E::PATH),
515 )
516 })?;
517 rollbacks.push(PreparedDataRollback {
518 key: raw_key,
519 value: raw_row.clone(),
520 });
521 }
522
523 Ok(rollbacks)
524 }
525
526 fn apply_marker_index_ops(
528 ops: &[CommitIndexOp],
529 stores: Vec<&'static LocalKey<RefCell<IndexStore>>>,
530 ) {
531 debug_assert_eq!(
532 ops.len(),
533 stores.len(),
534 "commit marker index ops length mismatch"
535 );
536
537 for (op, store) in ops.iter().zip(stores.into_iter()) {
538 debug_assert_eq!(op.key.len(), IndexKey::STORED_SIZE as usize);
539 let raw_key = RawIndexKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
540
541 store.with_borrow_mut(|s| {
542 if let Some(value) = &op.value {
543 debug_assert!(value.len() <= MAX_INDEX_ENTRY_BYTES as usize);
544 let raw_entry = RawIndexEntry::from_bytes(Cow::Borrowed(value.as_slice()));
545 s.insert(raw_key, raw_entry);
546 } else {
547 s.remove(&raw_key);
548 }
549 });
550 }
551 }
552
553 fn apply_index_rollbacks(ops: Vec<PreparedIndexRollback>) {
555 for op in ops {
556 op.store.with_borrow_mut(|s| {
557 if let Some(value) = op.value {
558 s.insert(op.key, value);
559 } else {
560 s.remove(&op.key);
561 }
562 });
563 }
564 }
565
566 fn apply_marker_data_ops(
568 ops: &[CommitDataOp],
569 ctx: &Context<'_, E>,
570 ) -> Result<(), InternalError> {
571 for op in ops {
572 debug_assert!(op.value.is_none());
573 let raw_key = RawDataKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
574 ctx.with_store_mut(|s| s.remove(&raw_key))?;
575 }
576 Ok(())
577 }
578
579 fn apply_data_rollbacks(db: Db<E::Canister>, ops: Vec<PreparedDataRollback>) {
581 let ctx = db.context::<E>();
582 for op in ops {
583 let _ = ctx.with_store_mut(|s| s.insert(op.key, op.value));
584 }
585 }
586
587 fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, RawRow, E), InternalError> {
588 let dk = DataKey::new::<E>(pk.into());
589 let row = self.db.context::<E>().read_strict(&dk)?;
590 let entity = row.try_decode::<E>().map_err(|err| {
591 ExecutorError::corruption(
592 ErrorOrigin::Serialize,
593 format!("failed to deserialize row: {dk} ({err})"),
594 )
595 })?;
596 Ok((dk, row, entity))
597 }
598
599 fn build_index_plans(&self) -> Result<Vec<IndexPlan>, InternalError> {
600 E::INDEXES
601 .iter()
602 .map(|index| {
603 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
604 Ok(IndexPlan { index, store })
605 })
606 .collect()
607 }
608
609 #[expect(clippy::too_many_lines)]
611 fn build_index_removal_ops(
612 plans: &[IndexPlan],
613 entities: &[&E],
614 ) -> Result<(Vec<CommitIndexOp>, usize), InternalError> {
615 let mut ops = Vec::new();
616 let mut removed = 0usize;
617
618 for plan in plans {
620 let fields = plan.index.fields.join(", ");
621
622 let mut entries: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
624
625 for entity in entities {
627 let Some(key) = IndexKey::new(*entity, plan.index)? else {
628 continue;
629 };
630 let raw_key = key.to_raw();
631 let entity_key = entity.key();
632
633 let entry = match entries.entry(raw_key) {
635 std::collections::btree_map::Entry::Vacant(slot) => {
636 let decoded = plan
637 .store
638 .with_borrow(|s| s.get(&raw_key))
639 .map(|raw| {
640 raw.try_decode().map_err(|err| {
641 ExecutorError::corruption(
642 ErrorOrigin::Index,
643 format!(
644 "index corrupted: {} ({}) -> {}",
645 E::PATH,
646 fields,
647 err
648 ),
649 )
650 })
651 })
652 .transpose()?;
653 slot.insert(decoded)
654 }
655 std::collections::btree_map::Entry::Occupied(slot) => slot.into_mut(),
656 };
657
658 let Some(e) = entry.as_ref() else {
660 return Err(ExecutorError::corruption(
661 ErrorOrigin::Index,
662 format!(
663 "index corrupted: {} ({}) -> {}",
664 E::PATH,
665 fields,
666 IndexEntryCorruption::missing_key(raw_key, entity_key),
667 ),
668 )
669 .into());
670 };
671
672 if plan.index.unique && e.len() > 1 {
673 return Err(ExecutorError::corruption(
674 ErrorOrigin::Index,
675 format!(
676 "index corrupted: {} ({}) -> {}",
677 E::PATH,
678 fields,
679 IndexEntryCorruption::NonUniqueEntry { keys: e.len() },
680 ),
681 )
682 .into());
683 }
684
685 if !e.contains(&entity_key) {
686 return Err(ExecutorError::corruption(
687 ErrorOrigin::Index,
688 format!(
689 "index corrupted: {} ({}) -> {}",
690 E::PATH,
691 fields,
692 IndexEntryCorruption::missing_key(raw_key, entity_key),
693 ),
694 )
695 .into());
696 }
697 removed = removed.saturating_add(1);
698
699 if let Some(e) = entry.as_mut() {
701 e.remove_key(&entity_key);
702 if e.is_empty() {
703 *entry = None;
704 }
705 }
706 }
707
708 for (raw_key, entry) in entries {
710 let value = if let Some(entry) = entry {
711 let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| match err {
712 crate::db::index::entry::IndexEntryEncodeError::TooManyKeys { keys } => {
713 InternalError::new(
714 ErrorClass::Corruption,
715 ErrorOrigin::Index,
716 format!(
717 "index corrupted: {} ({}) -> {}",
718 E::PATH,
719 fields,
720 IndexEntryCorruption::TooManyKeys { count: keys }
721 ),
722 )
723 }
724 crate::db::index::entry::IndexEntryEncodeError::KeyEncoding(err) => {
725 InternalError::new(
726 ErrorClass::Unsupported,
727 ErrorOrigin::Index,
728 format!(
729 "index key encoding failed: {} ({fields}) -> {err}",
730 E::PATH
731 ),
732 )
733 }
734 })?;
735 Some(raw.as_bytes().to_vec())
736 } else {
737 None
739 };
740
741 ops.push(CommitIndexOp {
742 store: plan.index.store.to_string(),
743 key: raw_key.as_bytes().to_vec(),
744 value,
745 });
746 }
747 }
748
749 Ok((ops, removed))
750 }
751}
752
753fn decode_rows<E: EntityKind>(rows: Vec<DataRow>) -> Result<Vec<DeleteRow<E>>, InternalError> {
754 rows.into_iter()
755 .map(|(dk, raw)| {
756 let dk_for_err = dk.clone();
757 let entity = raw.try_decode::<E>().map_err(|err| {
758 ExecutorError::corruption(
759 ErrorOrigin::Serialize,
760 format!("failed to deserialize row: {dk_for_err} ({err})"),
761 )
762 })?;
763
764 let expected = dk.key();
765 let actual = entity.key();
766 if expected != actual {
767 return Err(ExecutorError::corruption(
768 ErrorOrigin::Store,
769 format!("row key mismatch: expected {expected}, found {actual}"),
770 )
771 .into());
772 }
773
774 Ok(DeleteRow {
775 key: dk,
776 raw: Some(raw),
777 entity,
778 })
779 })
780 .collect()
781}