Skip to main content

icydb_core/db/executor/
delete.rs

1use crate::{
2    db::{
3        CommitDataOp, CommitIndexOp, CommitKind, CommitMarker, Db, WriteUnit, begin_commit,
4        ensure_recovered,
5        executor::{
6            Context, ExecutorError, UniqueIndexHandle,
7            plan::{record_plan_metrics, set_rows_from_len},
8            resolve_unique_pk,
9            trace::{
10                QueryTraceSink, TraceAccess, TraceExecutorKind, start_exec_trace, start_plan_trace,
11            },
12        },
13        finish_commit,
14        index::{
15            IndexEntry, IndexEntryCorruption, IndexKey, IndexStore, MAX_INDEX_ENTRY_BYTES,
16            RawIndexEntry, RawIndexKey,
17        },
18        query::plan::ExecutablePlan,
19        response::Response,
20        store::{DataKey, DataRow, RawDataKey, RawRow},
21        traits::FromKey,
22    },
23    error::{ErrorClass, ErrorOrigin, InternalError},
24    obs::sink::{self, ExecKind, MetricsEvent, Span},
25    prelude::*,
26    sanitize::sanitize,
27    traits::{EntityKind, Path, Storable},
28};
29use std::{
30    borrow::Cow, cell::RefCell, collections::BTreeMap, marker::PhantomData, thread::LocalKey,
31};
32
33///
34/// IndexPlan
35/// Prevalidated handle to an index store used during commit planning.
36///
37
38struct IndexPlan {
39    index: &'static IndexModel,
40    store: &'static LocalKey<RefCell<IndexStore>>,
41}
42
43// Prevalidated rollback mutation for index entries.
44struct PreparedIndexRollback {
45    store: &'static LocalKey<RefCell<IndexStore>>,
46    key: RawIndexKey,
47    value: Option<RawIndexEntry>,
48}
49
50// Prevalidated rollback mutation for data rows.
51struct PreparedDataRollback {
52    key: RawDataKey,
53    value: RawRow,
54}
55
56// Row wrapper used during delete planning and execution.
57struct DeleteRow<E> {
58    key: DataKey,
59    raw: Option<RawRow>,
60    entity: E,
61}
62
63impl<E: EntityKind> crate::db::query::plan::logical::PlanRow<E> for DeleteRow<E> {
64    fn entity(&self) -> &E {
65        &self.entity
66    }
67}
68
69///
70/// DeleteExecutor
71///
72/// Stage-1 atomicity invariant:
73/// All fallible validation completes before the first stable write.
74/// Mutations run inside a WriteUnit so mid-flight failures roll back
75/// before the commit marker is cleared.
76///
77#[derive(Clone, Copy)]
78pub struct DeleteExecutor<E: EntityKind> {
79    db: Db<E::Canister>,
80    debug: bool,
81    trace: Option<&'static dyn QueryTraceSink>,
82    _marker: PhantomData<E>,
83}
84
85impl<E: EntityKind> DeleteExecutor<E> {
86    #[must_use]
87    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
88        Self {
89            db,
90            debug,
91            trace: None,
92            _marker: PhantomData,
93        }
94    }
95
96    #[must_use]
97    #[allow(dead_code)]
98    pub(crate) const fn with_trace_sink(
99        mut self,
100        sink: Option<&'static dyn QueryTraceSink>,
101    ) -> Self {
102        self.trace = sink;
103        self
104    }
105
106    #[must_use]
107    pub const fn debug(mut self) -> Self {
108        self.debug = true;
109        self
110    }
111
112    fn debug_log(&self, s: impl Into<String>) {
113        if self.debug {
114            println!("{}", s.into());
115        }
116    }
117
118    // ─────────────────────────────────────────────
119    // Unique-index delete
120    // ─────────────────────────────────────────────
121
122    pub fn by_unique_index(
123        self,
124        index: UniqueIndexHandle,
125        entity: E,
126    ) -> Result<Response<E>, InternalError>
127    where
128        E::PrimaryKey: FromKey,
129    {
130        let trace = start_exec_trace(
131            self.trace,
132            TraceExecutorKind::Delete,
133            E::PATH,
134            Some(TraceAccess::UniqueIndex {
135                name: index.index().name,
136            }),
137            Some(index.index().name),
138        );
139        let result = (|| {
140            self.debug_log(format!(
141                "[debug] delete by unique index on {} ({})",
142                E::PATH,
143                index.index().fields.join(", ")
144            ));
145            let mut span = Span::<E>::new(ExecKind::Delete);
146            ensure_recovered(&self.db)?;
147
148            let index = index.index();
149            let mut lookup = entity;
150            sanitize(&mut lookup)?;
151
152            // Resolve PK via unique index; absence is a no-op.
153            let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
154                set_rows_from_len(&mut span, 0);
155                return Ok(Response(Vec::new()));
156            };
157
158            // Intentional re-decode for defense-in-depth; resolve_unique_pk only returns the key.
159            let (dk, stored_row, stored) = self.load_existing(pk)?;
160            let ctx = self.db.context::<E>();
161            let index_plans = self.build_index_plans()?;
162            let (index_ops, index_remove_count) =
163                Self::build_index_removal_ops(&index_plans, &[&stored])?;
164
165            // Preflight: ensure stores are accessible before committing.
166            ctx.with_store(|_| ())?;
167
168            let raw_key = dk.to_raw()?;
169            let marker = CommitMarker::new(
170                CommitKind::Delete,
171                index_ops,
172                vec![CommitDataOp {
173                    store: E::Store::PATH.to_string(),
174                    key: raw_key.as_bytes().to_vec(),
175                    value: None,
176                }],
177            )?;
178            let (index_apply_stores, index_rollback_ops) =
179                Self::prepare_index_delete_ops(&index_plans, &marker.index_ops)?;
180            let mut rollback_rows = BTreeMap::new();
181            rollback_rows.insert(raw_key, stored_row);
182            let data_rollback_ops =
183                Self::prepare_data_delete_ops(&marker.data_ops, &rollback_rows)?;
184            let commit = begin_commit(marker)?;
185
186            finish_commit(commit, |guard| {
187                let mut unit = WriteUnit::new("delete_unique_row_atomic");
188
189                // Commit boundary: apply the marker's raw mutations mechanically.
190                let index_rollback_ops = index_rollback_ops;
191                unit.record_rollback(move || Self::apply_index_rollbacks(index_rollback_ops));
192                Self::apply_marker_index_ops(&guard.marker.index_ops, index_apply_stores);
193                for _ in 0..index_remove_count {
194                    sink::record(MetricsEvent::IndexRemove {
195                        entity_path: E::PATH,
196                    });
197                }
198
199                unit.checkpoint("delete_unique_after_indexes")?;
200
201                // Apply data mutations recorded in the marker.
202                let data_rollback_ops = data_rollback_ops;
203                let db = self.db;
204                unit.record_rollback(move || Self::apply_data_rollbacks(db, data_rollback_ops));
205                unit.run(|| Self::apply_marker_data_ops(&guard.marker.data_ops, &ctx))?;
206
207                unit.checkpoint("delete_unique_after_data")?;
208                unit.commit();
209                Ok(())
210            })?;
211
212            set_rows_from_len(&mut span, 1);
213            Ok(Response(vec![(dk.key(), stored)]))
214        })();
215
216        if let Some(trace) = trace {
217            match &result {
218                Ok(resp) => trace.finish(u64::try_from(resp.0.len()).unwrap_or(u64::MAX)),
219                Err(err) => trace.error(err),
220            }
221        }
222
223        result
224    }
225
226    // ─────────────────────────────────────────────
227    // Plan-based delete
228    // ─────────────────────────────────────────────
229
230    #[allow(clippy::too_many_lines)]
231    pub fn execute(self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
232        let trace = start_plan_trace(self.trace, TraceExecutorKind::Delete, &plan);
233        let result = (|| {
234            let plan = plan.into_inner();
235            ensure_recovered(&self.db)?;
236
237            self.debug_log(format!("[debug] delete plan on {}", E::PATH));
238
239            let mut span = Span::<E>::new(ExecKind::Delete);
240            record_plan_metrics(&plan.access);
241
242            let ctx = self.db.context::<E>();
243            let data_rows = ctx.rows_from_access_plan(&plan.access, plan.consistency)?;
244            sink::record(MetricsEvent::RowsScanned {
245                entity_path: E::PATH,
246                rows_scanned: data_rows.len() as u64,
247            });
248
249            let mut rows = decode_rows::<E>(data_rows)?;
250
251            let stats = plan.apply_post_access::<E, _>(&mut rows)?;
252            if stats.delete_limited {
253                self.debug_log(format!(
254                    "applied delete limit -> {} entities selected",
255                    rows.len()
256                ));
257            }
258
259            if rows.is_empty() {
260                set_rows_from_len(&mut span, 0);
261                return Ok(Response(Vec::new()));
262            }
263
264            let index_plans = self.build_index_plans()?;
265            let (index_ops, index_remove_count) = {
266                let entities: Vec<&E> = rows.iter().map(|row| &row.entity).collect();
267                Self::build_index_removal_ops(&index_plans, &entities)?
268            };
269
270            // Preflight store access to ensure no fallible work remains post-commit.
271            ctx.with_store(|_| ())?;
272
273            let mut rollback_rows = BTreeMap::new();
274            let data_ops = rows
275                .iter_mut()
276                .map(|row| {
277                    let raw_key = row.key.to_raw()?;
278                    let raw_row = row.raw.take().ok_or_else(|| {
279                        InternalError::new(
280                            ErrorClass::Internal,
281                            ErrorOrigin::Store,
282                            "missing raw row for delete rollback".to_string(),
283                        )
284                    })?;
285                    rollback_rows.insert(raw_key, raw_row);
286                    Ok(CommitDataOp {
287                        store: E::Store::PATH.to_string(),
288                        key: raw_key.as_bytes().to_vec(),
289                        value: None,
290                    })
291                })
292                .collect::<Result<Vec<_>, InternalError>>()?;
293
294            let marker = CommitMarker::new(CommitKind::Delete, index_ops, data_ops)?;
295            let (index_apply_stores, index_rollback_ops) =
296                Self::prepare_index_delete_ops(&index_plans, &marker.index_ops)?;
297            let data_rollback_ops =
298                Self::prepare_data_delete_ops(&marker.data_ops, &rollback_rows)?;
299            let commit = begin_commit(marker)?;
300
301            finish_commit(commit, |guard| {
302                let mut unit = WriteUnit::new("delete_rows_atomic");
303
304                // Commit boundary: apply the marker's raw mutations mechanically.
305                let index_rollback_ops = index_rollback_ops;
306                unit.record_rollback(move || Self::apply_index_rollbacks(index_rollback_ops));
307                Self::apply_marker_index_ops(&guard.marker.index_ops, index_apply_stores);
308                for _ in 0..index_remove_count {
309                    sink::record(MetricsEvent::IndexRemove {
310                        entity_path: E::PATH,
311                    });
312                }
313
314                unit.checkpoint("delete_after_indexes")?;
315
316                // Apply data mutations recorded in the marker.
317                let data_rollback_ops = data_rollback_ops;
318                let db = self.db;
319                unit.record_rollback(move || Self::apply_data_rollbacks(db, data_rollback_ops));
320                unit.run(|| Self::apply_marker_data_ops(&guard.marker.data_ops, &ctx))?;
321
322                unit.checkpoint("delete_after_data")?;
323                unit.commit();
324
325                Ok(())
326            })?;
327
328            let res = rows
329                .into_iter()
330                .map(|row| (row.key.key(), row.entity))
331                .collect::<Vec<_>>();
332            set_rows_from_len(&mut span, res.len());
333
334            Ok(Response(res))
335        })();
336
337        if let Some(trace) = trace {
338            match &result {
339                Ok(resp) => trace.finish(u64::try_from(resp.0.len()).unwrap_or(u64::MAX)),
340                Err(err) => trace.error(err),
341            }
342        }
343
344        result
345    }
346
347    // ─────────────────────────────────────────────
348    // Helpers
349    // ─────────────────────────────────────────────
350
351    // Resolve commit marker index ops into stores and rollback bytes before committing.
352    #[expect(clippy::type_complexity)]
353    fn prepare_index_delete_ops(
354        plans: &[IndexPlan],
355        ops: &[CommitIndexOp],
356    ) -> Result<
357        (
358            Vec<&'static LocalKey<RefCell<IndexStore>>>,
359            Vec<PreparedIndexRollback>,
360        ),
361        InternalError,
362    > {
363        // Resolve store handles once so commit-time apply is mechanical.
364        let mut stores = BTreeMap::new();
365        for plan in plans {
366            stores.insert(plan.index.store, plan.store);
367        }
368
369        let mut apply_stores = Vec::with_capacity(ops.len());
370        let mut rollbacks = Vec::with_capacity(ops.len());
371
372        // Prevalidate commit ops and capture rollback bytes from current state.
373        for op in ops {
374            let store = stores.get(op.store.as_str()).ok_or_else(|| {
375                InternalError::new(
376                    ErrorClass::Internal,
377                    ErrorOrigin::Index,
378                    format!(
379                        "commit marker references unknown index store '{}' ({})",
380                        op.store,
381                        E::PATH
382                    ),
383                )
384            })?;
385            if op.key.len() != IndexKey::STORED_SIZE as usize {
386                return Err(InternalError::new(
387                    ErrorClass::Internal,
388                    ErrorOrigin::Index,
389                    format!(
390                        "commit marker index key length {} does not match {} ({})",
391                        op.key.len(),
392                        IndexKey::STORED_SIZE,
393                        E::PATH
394                    ),
395                ));
396            }
397            if let Some(value) = &op.value
398                && value.len() > MAX_INDEX_ENTRY_BYTES as usize
399            {
400                return Err(InternalError::new(
401                    ErrorClass::Internal,
402                    ErrorOrigin::Index,
403                    format!(
404                        "commit marker index entry exceeds max size: {} bytes ({})",
405                        value.len(),
406                        E::PATH
407                    ),
408                ));
409            }
410
411            let raw_key = RawIndexKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
412            let rollback_value = store.with_borrow(|s| s.get(&raw_key)).ok_or_else(|| {
413                InternalError::new(
414                    ErrorClass::Internal,
415                    ErrorOrigin::Index,
416                    format!(
417                        "commit marker index op missing entry before delete: {} ({})",
418                        op.store,
419                        E::PATH
420                    ),
421                )
422            })?;
423
424            apply_stores.push(*store);
425            rollbacks.push(PreparedIndexRollback {
426                store,
427                key: raw_key,
428                value: Some(rollback_value),
429            });
430        }
431
432        Ok((apply_stores, rollbacks))
433    }
434
435    // Resolve commit marker data ops and capture rollback rows before committing.
436    fn prepare_data_delete_ops(
437        ops: &[CommitDataOp],
438        rollback_rows: &BTreeMap<RawDataKey, RawRow>,
439    ) -> Result<Vec<PreparedDataRollback>, InternalError> {
440        let mut rollbacks = Vec::with_capacity(ops.len());
441
442        // Validate marker ops and map them to rollback rows.
443        for op in ops {
444            if op.store != E::Store::PATH {
445                return Err(InternalError::new(
446                    ErrorClass::Internal,
447                    ErrorOrigin::Store,
448                    format!(
449                        "commit marker references unexpected data store '{}' ({})",
450                        op.store,
451                        E::PATH
452                    ),
453                ));
454            }
455            if op.key.len() != DataKey::STORED_SIZE as usize {
456                return Err(InternalError::new(
457                    ErrorClass::Internal,
458                    ErrorOrigin::Store,
459                    format!(
460                        "commit marker data key length {} does not match {} ({})",
461                        op.key.len(),
462                        DataKey::STORED_SIZE,
463                        E::PATH
464                    ),
465                ));
466            }
467            if op.value.is_some() {
468                return Err(InternalError::new(
469                    ErrorClass::Internal,
470                    ErrorOrigin::Store,
471                    format!("commit marker delete includes data payload ({})", E::PATH),
472                ));
473            }
474
475            let raw_key = RawDataKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
476            let raw_row = rollback_rows.get(&raw_key).ok_or_else(|| {
477                InternalError::new(
478                    ErrorClass::Internal,
479                    ErrorOrigin::Store,
480                    format!("commit marker data op missing rollback row ({})", E::PATH),
481                )
482            })?;
483            rollbacks.push(PreparedDataRollback {
484                key: raw_key,
485                value: raw_row.clone(),
486            });
487        }
488
489        Ok(rollbacks)
490    }
491
492    // Apply commit marker index ops using pre-resolved stores.
493    fn apply_marker_index_ops(
494        ops: &[CommitIndexOp],
495        stores: Vec<&'static LocalKey<RefCell<IndexStore>>>,
496    ) {
497        debug_assert_eq!(
498            ops.len(),
499            stores.len(),
500            "commit marker index ops length mismatch"
501        );
502
503        for (op, store) in ops.iter().zip(stores.into_iter()) {
504            debug_assert_eq!(op.key.len(), IndexKey::STORED_SIZE as usize);
505            let raw_key = RawIndexKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
506
507            store.with_borrow_mut(|s| {
508                if let Some(value) = &op.value {
509                    debug_assert!(value.len() <= MAX_INDEX_ENTRY_BYTES as usize);
510                    let raw_entry = RawIndexEntry::from_bytes(Cow::Borrowed(value.as_slice()));
511                    s.insert(raw_key, raw_entry);
512                } else {
513                    s.remove(&raw_key);
514                }
515            });
516        }
517    }
518
519    // Apply rollback mutations for index entries using raw bytes.
520    fn apply_index_rollbacks(ops: Vec<PreparedIndexRollback>) {
521        for op in ops {
522            op.store.with_borrow_mut(|s| {
523                if let Some(value) = op.value {
524                    s.insert(op.key, value);
525                } else {
526                    s.remove(&op.key);
527                }
528            });
529        }
530    }
531
532    // Apply commit marker data deletes using raw keys only.
533    fn apply_marker_data_ops(
534        ops: &[CommitDataOp],
535        ctx: &Context<'_, E>,
536    ) -> Result<(), InternalError> {
537        for op in ops {
538            debug_assert!(op.value.is_none());
539            let raw_key = RawDataKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
540            ctx.with_store_mut(|s| s.remove(&raw_key))?;
541        }
542        Ok(())
543    }
544
545    // Apply rollback mutations for data rows.
546    fn apply_data_rollbacks(db: Db<E::Canister>, ops: Vec<PreparedDataRollback>) {
547        let ctx = db.context::<E>();
548        for op in ops {
549            let _ = ctx.with_store_mut(|s| s.insert(op.key, op.value));
550        }
551    }
552
553    fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, RawRow, E), InternalError> {
554        let dk = DataKey::new::<E>(pk.into());
555        let row = self.db.context::<E>().read_strict(&dk)?;
556        let entity = row.try_decode::<E>().map_err(|err| {
557            ExecutorError::corruption(
558                ErrorOrigin::Serialize,
559                format!("failed to deserialize row: {dk} ({err})"),
560            )
561        })?;
562        Ok((dk, row, entity))
563    }
564
565    fn build_index_plans(&self) -> Result<Vec<IndexPlan>, InternalError> {
566        E::INDEXES
567            .iter()
568            .map(|index| {
569                let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
570                Ok(IndexPlan { index, store })
571            })
572            .collect()
573    }
574
575    // Build commit-time index ops and count entity-level removals for metrics.
576    #[expect(clippy::too_many_lines)]
577    fn build_index_removal_ops(
578        plans: &[IndexPlan],
579        entities: &[&E],
580    ) -> Result<(Vec<CommitIndexOp>, usize), InternalError> {
581        let mut ops = Vec::new();
582        let mut removed = 0usize;
583
584        // Process each index independently to compute its resulting mutations.
585        for plan in plans {
586            let fields = plan.index.fields.join(", ");
587
588            // Map raw index keys → updated entry (or None if fully removed).
589            let mut entries: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
590
591            // Fold entity deletions into per-key index entry updates.
592            for entity in entities {
593                let Some(key) = IndexKey::new(*entity, plan.index)? else {
594                    continue;
595                };
596                let raw_key = key.to_raw();
597                let entity_key = entity.key();
598
599                // Lazily load and decode the existing index entry once per key.
600                let entry = match entries.entry(raw_key) {
601                    std::collections::btree_map::Entry::Vacant(slot) => {
602                        let decoded = plan
603                            .store
604                            .with_borrow(|s| s.get(&raw_key))
605                            .map(|raw| {
606                                raw.try_decode().map_err(|err| {
607                                    ExecutorError::corruption(
608                                        ErrorOrigin::Index,
609                                        format!(
610                                            "index corrupted: {} ({}) -> {}",
611                                            E::PATH,
612                                            fields,
613                                            err
614                                        ),
615                                    )
616                                })
617                            })
618                            .transpose()?;
619                        slot.insert(decoded)
620                    }
621                    std::collections::btree_map::Entry::Occupied(slot) => slot.into_mut(),
622                };
623
624                // Prevalidate membership to keep commit-phase mutations infallible.
625                let Some(e) = entry.as_ref() else {
626                    return Err(ExecutorError::corruption(
627                        ErrorOrigin::Index,
628                        format!(
629                            "index corrupted: {} ({}) -> {}",
630                            E::PATH,
631                            fields,
632                            IndexEntryCorruption::missing_key(raw_key, entity_key),
633                        ),
634                    )
635                    .into());
636                };
637
638                if plan.index.unique && e.len() > 1 {
639                    return Err(ExecutorError::corruption(
640                        ErrorOrigin::Index,
641                        format!(
642                            "index corrupted: {} ({}) -> {}",
643                            E::PATH,
644                            fields,
645                            IndexEntryCorruption::NonUniqueEntry { keys: e.len() },
646                        ),
647                    )
648                    .into());
649                }
650
651                if !e.contains(&entity_key) {
652                    return Err(ExecutorError::corruption(
653                        ErrorOrigin::Index,
654                        format!(
655                            "index corrupted: {} ({}) -> {}",
656                            E::PATH,
657                            fields,
658                            IndexEntryCorruption::missing_key(raw_key, entity_key),
659                        ),
660                    )
661                    .into());
662                }
663                removed = removed.saturating_add(1);
664
665                // Remove this entity’s key from the index entry.
666                if let Some(e) = entry.as_mut() {
667                    e.remove_key(&entity_key);
668                    if e.is_empty() {
669                        *entry = None;
670                    }
671                }
672            }
673
674            // Emit commit ops for each touched index key.
675            for (raw_key, entry) in entries {
676                let value = if let Some(entry) = entry {
677                    let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| match err {
678                        crate::db::index::entry::IndexEntryEncodeError::TooManyKeys { keys } => {
679                            InternalError::new(
680                                ErrorClass::Corruption,
681                                ErrorOrigin::Index,
682                                format!(
683                                    "index corrupted: {} ({}) -> {}",
684                                    E::PATH,
685                                    fields,
686                                    IndexEntryCorruption::TooManyKeys { count: keys }
687                                ),
688                            )
689                        }
690                        crate::db::index::entry::IndexEntryEncodeError::KeyEncoding(err) => {
691                            InternalError::new(
692                                ErrorClass::Unsupported,
693                                ErrorOrigin::Index,
694                                format!(
695                                    "index key encoding failed: {} ({fields}) -> {err}",
696                                    E::PATH
697                                ),
698                            )
699                        }
700                    })?;
701                    Some(raw.as_bytes().to_vec())
702                } else {
703                    // None means the index entry is fully removed.
704                    None
705                };
706
707                ops.push(CommitIndexOp {
708                    store: plan.index.store.to_string(),
709                    key: raw_key.as_bytes().to_vec(),
710                    value,
711                });
712            }
713        }
714
715        Ok((ops, removed))
716    }
717}
718
719fn decode_rows<E: EntityKind>(rows: Vec<DataRow>) -> Result<Vec<DeleteRow<E>>, InternalError> {
720    rows.into_iter()
721        .map(|(dk, raw)| {
722            let dk_for_err = dk.clone();
723            let entity = raw.try_decode::<E>().map_err(|err| {
724                ExecutorError::corruption(
725                    ErrorOrigin::Serialize,
726                    format!("failed to deserialize row: {dk_for_err} ({err})"),
727                )
728            })?;
729
730            let expected = dk.key();
731            let actual = entity.key();
732            if expected != actual {
733                return Err(ExecutorError::corruption(
734                    ErrorOrigin::Store,
735                    format!("row key mismatch: expected {expected}, found {actual}"),
736                )
737                .into());
738            }
739
740            Ok(DeleteRow {
741                key: dk,
742                raw: Some(raw),
743                entity,
744            })
745        })
746        .collect()
747}