Skip to main content

icydb_core/db/executor/
delete.rs

1use crate::{
2    db::{
3        CommitDataOp, CommitIndexOp, CommitKind, CommitMarker, Db, WriteUnit, begin_commit,
4        ensure_recovered,
5        executor::{
6            Context, ExecutorError, UniqueIndexHandle,
7            plan::{record_plan_metrics, set_rows_from_len},
8            resolve_unique_pk,
9            trace::{
10                QueryTraceSink, TraceAccess, TraceExecutorKind, TracePhase, start_exec_trace,
11                start_plan_trace,
12            },
13        },
14        finish_commit,
15        index::{
16            IndexEntry, IndexEntryCorruption, IndexKey, IndexStore, MAX_INDEX_ENTRY_BYTES,
17            RawIndexEntry, RawIndexKey,
18        },
19        query::plan::ExecutablePlan,
20        response::Response,
21        store::{DataKey, DataRow, RawDataKey, RawRow},
22        traits::FromKey,
23    },
24    error::{ErrorClass, ErrorOrigin, InternalError},
25    obs::sink::{self, ExecKind, MetricsEvent, Span},
26    prelude::*,
27    sanitize::sanitize,
28    traits::{EntityKind, Path, Storable},
29};
30use std::{
31    borrow::Cow, cell::RefCell, collections::BTreeMap, marker::PhantomData, thread::LocalKey,
32};
33
34///
35/// IndexPlan
36/// Prevalidated handle to an index store used during commit planning.
37///
38
39struct IndexPlan {
40    index: &'static IndexModel,
41    store: &'static LocalKey<RefCell<IndexStore>>,
42}
43
44// Prevalidated rollback mutation for index entries.
45struct PreparedIndexRollback {
46    store: &'static LocalKey<RefCell<IndexStore>>,
47    key: RawIndexKey,
48    value: Option<RawIndexEntry>,
49}
50
51// Prevalidated rollback mutation for data rows.
52struct PreparedDataRollback {
53    key: RawDataKey,
54    value: RawRow,
55}
56
57// Row wrapper used during delete planning and execution.
58struct DeleteRow<E> {
59    key: DataKey,
60    raw: Option<RawRow>,
61    entity: E,
62}
63
64impl<E: EntityKind> crate::db::query::plan::logical::PlanRow<E> for DeleteRow<E> {
65    fn entity(&self) -> &E {
66        &self.entity
67    }
68}
69
70///
71/// DeleteExecutor
72///
73/// Stage-1 atomicity invariant:
74/// All fallible validation completes before the first stable write.
75/// Mutations run inside a WriteUnit so mid-flight failures roll back
76/// before the commit marker is cleared.
77///
78#[derive(Clone, Copy)]
79pub struct DeleteExecutor<E: EntityKind> {
80    db: Db<E::Canister>,
81    debug: bool,
82    trace: Option<&'static dyn QueryTraceSink>,
83    _marker: PhantomData<E>,
84}
85
86impl<E: EntityKind> DeleteExecutor<E> {
87    #[must_use]
88    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
89        Self {
90            db,
91            debug,
92            trace: None,
93            _marker: PhantomData,
94        }
95    }
96
97    #[must_use]
98    #[allow(dead_code)]
99    pub(crate) const fn with_trace_sink(
100        mut self,
101        sink: Option<&'static dyn QueryTraceSink>,
102    ) -> Self {
103        self.trace = sink;
104        self
105    }
106
107    #[must_use]
108    pub const fn debug(mut self) -> Self {
109        self.debug = true;
110        self
111    }
112
113    fn debug_log(&self, s: impl Into<String>) {
114        if self.debug {
115            println!("{}", s.into());
116        }
117    }
118
119    // ─────────────────────────────────────────────
120    // Unique-index delete
121    // ─────────────────────────────────────────────
122
123    pub fn by_unique_index(
124        self,
125        index: UniqueIndexHandle,
126        entity: E,
127    ) -> Result<Response<E>, InternalError>
128    where
129        E::PrimaryKey: FromKey,
130    {
131        let trace = start_exec_trace(
132            self.trace,
133            TraceExecutorKind::Delete,
134            E::PATH,
135            Some(TraceAccess::UniqueIndex {
136                name: index.index().name,
137            }),
138            Some(index.index().name),
139        );
140        let result = (|| {
141            self.debug_log(format!(
142                "[debug] delete by unique index on {} ({})",
143                E::PATH,
144                index.index().fields.join(", ")
145            ));
146            let mut span = Span::<E>::new(ExecKind::Delete);
147            ensure_recovered(&self.db)?;
148
149            let index = index.index();
150            let mut lookup = entity;
151            sanitize(&mut lookup)?;
152
153            // Resolve PK via unique index; absence is a no-op.
154            let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
155                set_rows_from_len(&mut span, 0);
156                return Ok(Response(Vec::new()));
157            };
158
159            // Intentional re-decode for defense-in-depth; resolve_unique_pk only returns the key.
160            let (dk, stored_row, stored) = self.load_existing(pk)?;
161            let ctx = self.db.context::<E>();
162            let index_plans = self.build_index_plans()?;
163            let (index_ops, index_remove_count) =
164                Self::build_index_removal_ops(&index_plans, &[&stored])?;
165
166            // Preflight: ensure stores are accessible before committing.
167            ctx.with_store(|_| ())?;
168
169            let raw_key = dk.to_raw()?;
170            let marker = CommitMarker::new(
171                CommitKind::Delete,
172                index_ops,
173                vec![CommitDataOp {
174                    store: E::Store::PATH.to_string(),
175                    key: raw_key.as_bytes().to_vec(),
176                    value: None,
177                }],
178            )?;
179            let (index_apply_stores, index_rollback_ops) =
180                Self::prepare_index_delete_ops(&index_plans, &marker.index_ops)?;
181            let mut rollback_rows = BTreeMap::new();
182            rollback_rows.insert(raw_key, stored_row);
183            let data_rollback_ops =
184                Self::prepare_data_delete_ops(&marker.data_ops, &rollback_rows)?;
185            let commit = begin_commit(marker)?;
186
187            finish_commit(commit, |guard| {
188                let mut unit = WriteUnit::new("delete_unique_row_atomic");
189
190                // Commit boundary: apply the marker's raw mutations mechanically.
191                let index_rollback_ops = index_rollback_ops;
192                unit.record_rollback(move || Self::apply_index_rollbacks(index_rollback_ops));
193                Self::apply_marker_index_ops(&guard.marker.index_ops, index_apply_stores);
194                for _ in 0..index_remove_count {
195                    sink::record(MetricsEvent::IndexRemove {
196                        entity_path: E::PATH,
197                    });
198                }
199
200                unit.checkpoint("delete_unique_after_indexes")?;
201
202                // Apply data mutations recorded in the marker.
203                let data_rollback_ops = data_rollback_ops;
204                let db = self.db;
205                unit.record_rollback(move || Self::apply_data_rollbacks(db, data_rollback_ops));
206                unit.run(|| Self::apply_marker_data_ops(&guard.marker.data_ops, &ctx))?;
207
208                unit.checkpoint("delete_unique_after_data")?;
209                unit.commit();
210                Ok(())
211            })?;
212
213            set_rows_from_len(&mut span, 1);
214            Ok(Response(vec![(dk.key(), stored)]))
215        })();
216
217        if let Some(trace) = trace {
218            match &result {
219                Ok(resp) => trace.finish(u64::try_from(resp.0.len()).unwrap_or(u64::MAX)),
220                Err(err) => trace.error(err),
221            }
222        }
223
224        result
225    }
226
227    // ─────────────────────────────────────────────
228    // Plan-based delete
229    // ─────────────────────────────────────────────
230
231    #[allow(clippy::too_many_lines)]
232    pub fn execute(self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
233        if !plan.mode().is_delete() {
234            return Err(InternalError::new(
235                ErrorClass::Unsupported,
236                ErrorOrigin::Query,
237                "delete executor requires delete plans".to_string(),
238            ));
239        }
240        let trace = start_plan_trace(self.trace, TraceExecutorKind::Delete, &plan);
241        let result = (|| {
242            let plan = plan.into_inner();
243            ensure_recovered(&self.db)?;
244
245            self.debug_log(format!("[debug] delete plan on {}", E::PATH));
246
247            let mut span = Span::<E>::new(ExecKind::Delete);
248            record_plan_metrics(&plan.access);
249
250            let ctx = self.db.context::<E>();
251            // Access phase: resolve candidate rows before delete filtering.
252            let data_rows = ctx.rows_from_access_plan(&plan.access, plan.consistency)?;
253            sink::record(MetricsEvent::RowsScanned {
254                entity_path: E::PATH,
255                rows_scanned: data_rows.len() as u64,
256            });
257
258            // Decode rows into entities before post-access filtering.
259            let mut rows = decode_rows::<E>(data_rows)?;
260            let access_rows = rows.len();
261
262            // Post-access phase: filter, order, and apply delete limits.
263            let stats = plan.apply_post_access::<E, _>(&mut rows)?;
264            if stats.delete_limited {
265                self.debug_log(format!(
266                    "applied delete limit -> {} entities selected",
267                    rows.len()
268                ));
269            }
270
271            if rows.is_empty() {
272                if let Some(trace) = trace.as_ref() {
273                    let to_u64 = |len| u64::try_from(len).unwrap_or(u64::MAX);
274                    trace.phase(TracePhase::Access, to_u64(access_rows));
275                    trace.phase(TracePhase::Filter, to_u64(stats.rows_after_filter));
276                    trace.phase(TracePhase::Order, to_u64(stats.rows_after_order));
277                    trace.phase(
278                        TracePhase::DeleteLimit,
279                        to_u64(stats.rows_after_delete_limit),
280                    );
281                }
282                set_rows_from_len(&mut span, 0);
283                return Ok(Response(Vec::new()));
284            }
285
286            let index_plans = self.build_index_plans()?;
287            let (index_ops, index_remove_count) = {
288                let entities: Vec<&E> = rows.iter().map(|row| &row.entity).collect();
289                Self::build_index_removal_ops(&index_plans, &entities)?
290            };
291
292            // Preflight store access to ensure no fallible work remains post-commit.
293            ctx.with_store(|_| ())?;
294
295            let mut rollback_rows = BTreeMap::new();
296            let data_ops = rows
297                .iter_mut()
298                .map(|row| {
299                    let raw_key = row.key.to_raw()?;
300                    let raw_row = row.raw.take().ok_or_else(|| {
301                        InternalError::new(
302                            ErrorClass::Internal,
303                            ErrorOrigin::Store,
304                            "missing raw row for delete rollback".to_string(),
305                        )
306                    })?;
307                    rollback_rows.insert(raw_key, raw_row);
308                    Ok(CommitDataOp {
309                        store: E::Store::PATH.to_string(),
310                        key: raw_key.as_bytes().to_vec(),
311                        value: None,
312                    })
313                })
314                .collect::<Result<Vec<_>, InternalError>>()?;
315
316            let marker = CommitMarker::new(CommitKind::Delete, index_ops, data_ops)?;
317            let (index_apply_stores, index_rollback_ops) =
318                Self::prepare_index_delete_ops(&index_plans, &marker.index_ops)?;
319            let data_rollback_ops =
320                Self::prepare_data_delete_ops(&marker.data_ops, &rollback_rows)?;
321            let commit = begin_commit(marker)?;
322
323            finish_commit(commit, |guard| {
324                let mut unit = WriteUnit::new("delete_rows_atomic");
325
326                // Commit boundary: apply the marker's raw mutations mechanically.
327                let index_rollback_ops = index_rollback_ops;
328                unit.record_rollback(move || Self::apply_index_rollbacks(index_rollback_ops));
329                Self::apply_marker_index_ops(&guard.marker.index_ops, index_apply_stores);
330                for _ in 0..index_remove_count {
331                    sink::record(MetricsEvent::IndexRemove {
332                        entity_path: E::PATH,
333                    });
334                }
335
336                unit.checkpoint("delete_after_indexes")?;
337
338                // Apply data mutations recorded in the marker.
339                let data_rollback_ops = data_rollback_ops;
340                let db = self.db;
341                unit.record_rollback(move || Self::apply_data_rollbacks(db, data_rollback_ops));
342                unit.run(|| Self::apply_marker_data_ops(&guard.marker.data_ops, &ctx))?;
343
344                unit.checkpoint("delete_after_data")?;
345                unit.commit();
346
347                Ok(())
348            })?;
349
350            // Emit per-phase counts after the delete succeeds.
351            if let Some(trace) = trace.as_ref() {
352                let to_u64 = |len| u64::try_from(len).unwrap_or(u64::MAX);
353                trace.phase(TracePhase::Access, to_u64(access_rows));
354                trace.phase(TracePhase::Filter, to_u64(stats.rows_after_filter));
355                trace.phase(TracePhase::Order, to_u64(stats.rows_after_order));
356                trace.phase(
357                    TracePhase::DeleteLimit,
358                    to_u64(stats.rows_after_delete_limit),
359                );
360            }
361
362            let res = rows
363                .into_iter()
364                .map(|row| (row.key.key(), row.entity))
365                .collect::<Vec<_>>();
366            set_rows_from_len(&mut span, res.len());
367
368            Ok(Response(res))
369        })();
370
371        if let Some(trace) = trace {
372            match &result {
373                Ok(resp) => trace.finish(u64::try_from(resp.0.len()).unwrap_or(u64::MAX)),
374                Err(err) => trace.error(err),
375            }
376        }
377
378        result
379    }
380
381    // ─────────────────────────────────────────────
382    // Helpers
383    // ─────────────────────────────────────────────
384
385    // Resolve commit marker index ops into stores and rollback bytes before committing.
386    #[expect(clippy::type_complexity)]
387    fn prepare_index_delete_ops(
388        plans: &[IndexPlan],
389        ops: &[CommitIndexOp],
390    ) -> Result<
391        (
392            Vec<&'static LocalKey<RefCell<IndexStore>>>,
393            Vec<PreparedIndexRollback>,
394        ),
395        InternalError,
396    > {
397        // Resolve store handles once so commit-time apply is mechanical.
398        let mut stores = BTreeMap::new();
399        for plan in plans {
400            stores.insert(plan.index.store, plan.store);
401        }
402
403        let mut apply_stores = Vec::with_capacity(ops.len());
404        let mut rollbacks = Vec::with_capacity(ops.len());
405
406        // Prevalidate commit ops and capture rollback bytes from current state.
407        for op in ops {
408            let store = stores.get(op.store.as_str()).ok_or_else(|| {
409                InternalError::new(
410                    ErrorClass::Internal,
411                    ErrorOrigin::Index,
412                    format!(
413                        "commit marker references unknown index store '{}' ({})",
414                        op.store,
415                        E::PATH
416                    ),
417                )
418            })?;
419            if op.key.len() != IndexKey::STORED_SIZE as usize {
420                return Err(InternalError::new(
421                    ErrorClass::Internal,
422                    ErrorOrigin::Index,
423                    format!(
424                        "commit marker index key length {} does not match {} ({})",
425                        op.key.len(),
426                        IndexKey::STORED_SIZE,
427                        E::PATH
428                    ),
429                ));
430            }
431            if let Some(value) = &op.value
432                && value.len() > MAX_INDEX_ENTRY_BYTES as usize
433            {
434                return Err(InternalError::new(
435                    ErrorClass::Internal,
436                    ErrorOrigin::Index,
437                    format!(
438                        "commit marker index entry exceeds max size: {} bytes ({})",
439                        value.len(),
440                        E::PATH
441                    ),
442                ));
443            }
444
445            let raw_key = RawIndexKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
446            let rollback_value = store.with_borrow(|s| s.get(&raw_key)).ok_or_else(|| {
447                InternalError::new(
448                    ErrorClass::Internal,
449                    ErrorOrigin::Index,
450                    format!(
451                        "commit marker index op missing entry before delete: {} ({})",
452                        op.store,
453                        E::PATH
454                    ),
455                )
456            })?;
457
458            apply_stores.push(*store);
459            rollbacks.push(PreparedIndexRollback {
460                store,
461                key: raw_key,
462                value: Some(rollback_value),
463            });
464        }
465
466        Ok((apply_stores, rollbacks))
467    }
468
469    // Resolve commit marker data ops and capture rollback rows before committing.
470    fn prepare_data_delete_ops(
471        ops: &[CommitDataOp],
472        rollback_rows: &BTreeMap<RawDataKey, RawRow>,
473    ) -> Result<Vec<PreparedDataRollback>, InternalError> {
474        let mut rollbacks = Vec::with_capacity(ops.len());
475
476        // Validate marker ops and map them to rollback rows.
477        for op in ops {
478            if op.store != E::Store::PATH {
479                return Err(InternalError::new(
480                    ErrorClass::Internal,
481                    ErrorOrigin::Store,
482                    format!(
483                        "commit marker references unexpected data store '{}' ({})",
484                        op.store,
485                        E::PATH
486                    ),
487                ));
488            }
489            if op.key.len() != DataKey::STORED_SIZE as usize {
490                return Err(InternalError::new(
491                    ErrorClass::Internal,
492                    ErrorOrigin::Store,
493                    format!(
494                        "commit marker data key length {} does not match {} ({})",
495                        op.key.len(),
496                        DataKey::STORED_SIZE,
497                        E::PATH
498                    ),
499                ));
500            }
501            if op.value.is_some() {
502                return Err(InternalError::new(
503                    ErrorClass::Internal,
504                    ErrorOrigin::Store,
505                    format!("commit marker delete includes data payload ({})", E::PATH),
506                ));
507            }
508
509            let raw_key = RawDataKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
510            let raw_row = rollback_rows.get(&raw_key).ok_or_else(|| {
511                InternalError::new(
512                    ErrorClass::Internal,
513                    ErrorOrigin::Store,
514                    format!("commit marker data op missing rollback row ({})", E::PATH),
515                )
516            })?;
517            rollbacks.push(PreparedDataRollback {
518                key: raw_key,
519                value: raw_row.clone(),
520            });
521        }
522
523        Ok(rollbacks)
524    }
525
526    // Apply commit marker index ops using pre-resolved stores.
527    fn apply_marker_index_ops(
528        ops: &[CommitIndexOp],
529        stores: Vec<&'static LocalKey<RefCell<IndexStore>>>,
530    ) {
531        debug_assert_eq!(
532            ops.len(),
533            stores.len(),
534            "commit marker index ops length mismatch"
535        );
536
537        for (op, store) in ops.iter().zip(stores.into_iter()) {
538            debug_assert_eq!(op.key.len(), IndexKey::STORED_SIZE as usize);
539            let raw_key = RawIndexKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
540
541            store.with_borrow_mut(|s| {
542                if let Some(value) = &op.value {
543                    debug_assert!(value.len() <= MAX_INDEX_ENTRY_BYTES as usize);
544                    let raw_entry = RawIndexEntry::from_bytes(Cow::Borrowed(value.as_slice()));
545                    s.insert(raw_key, raw_entry);
546                } else {
547                    s.remove(&raw_key);
548                }
549            });
550        }
551    }
552
553    // Apply rollback mutations for index entries using raw bytes.
554    fn apply_index_rollbacks(ops: Vec<PreparedIndexRollback>) {
555        for op in ops {
556            op.store.with_borrow_mut(|s| {
557                if let Some(value) = op.value {
558                    s.insert(op.key, value);
559                } else {
560                    s.remove(&op.key);
561                }
562            });
563        }
564    }
565
566    // Apply commit marker data deletes using raw keys only.
567    fn apply_marker_data_ops(
568        ops: &[CommitDataOp],
569        ctx: &Context<'_, E>,
570    ) -> Result<(), InternalError> {
571        for op in ops {
572            debug_assert!(op.value.is_none());
573            let raw_key = RawDataKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
574            ctx.with_store_mut(|s| s.remove(&raw_key))?;
575        }
576        Ok(())
577    }
578
579    // Apply rollback mutations for data rows.
580    fn apply_data_rollbacks(db: Db<E::Canister>, ops: Vec<PreparedDataRollback>) {
581        let ctx = db.context::<E>();
582        for op in ops {
583            let _ = ctx.with_store_mut(|s| s.insert(op.key, op.value));
584        }
585    }
586
587    fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, RawRow, E), InternalError> {
588        let dk = DataKey::new::<E>(pk.into());
589        let row = self.db.context::<E>().read_strict(&dk)?;
590        let entity = row.try_decode::<E>().map_err(|err| {
591            ExecutorError::corruption(
592                ErrorOrigin::Serialize,
593                format!("failed to deserialize row: {dk} ({err})"),
594            )
595        })?;
596        Ok((dk, row, entity))
597    }
598
599    fn build_index_plans(&self) -> Result<Vec<IndexPlan>, InternalError> {
600        E::INDEXES
601            .iter()
602            .map(|index| {
603                let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
604                Ok(IndexPlan { index, store })
605            })
606            .collect()
607    }
608
609    // Build commit-time index ops and count entity-level removals for metrics.
610    #[expect(clippy::too_many_lines)]
611    fn build_index_removal_ops(
612        plans: &[IndexPlan],
613        entities: &[&E],
614    ) -> Result<(Vec<CommitIndexOp>, usize), InternalError> {
615        let mut ops = Vec::new();
616        let mut removed = 0usize;
617
618        // Process each index independently to compute its resulting mutations.
619        for plan in plans {
620            let fields = plan.index.fields.join(", ");
621
622            // Map raw index keys → updated entry (or None if fully removed).
623            let mut entries: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
624
625            // Fold entity deletions into per-key index entry updates.
626            for entity in entities {
627                let Some(key) = IndexKey::new(*entity, plan.index)? else {
628                    continue;
629                };
630                let raw_key = key.to_raw();
631                let entity_key = entity.key();
632
633                // Lazily load and decode the existing index entry once per key.
634                let entry = match entries.entry(raw_key) {
635                    std::collections::btree_map::Entry::Vacant(slot) => {
636                        let decoded = plan
637                            .store
638                            .with_borrow(|s| s.get(&raw_key))
639                            .map(|raw| {
640                                raw.try_decode().map_err(|err| {
641                                    ExecutorError::corruption(
642                                        ErrorOrigin::Index,
643                                        format!(
644                                            "index corrupted: {} ({}) -> {}",
645                                            E::PATH,
646                                            fields,
647                                            err
648                                        ),
649                                    )
650                                })
651                            })
652                            .transpose()?;
653                        slot.insert(decoded)
654                    }
655                    std::collections::btree_map::Entry::Occupied(slot) => slot.into_mut(),
656                };
657
658                // Prevalidate membership to keep commit-phase mutations infallible.
659                let Some(e) = entry.as_ref() else {
660                    return Err(ExecutorError::corruption(
661                        ErrorOrigin::Index,
662                        format!(
663                            "index corrupted: {} ({}) -> {}",
664                            E::PATH,
665                            fields,
666                            IndexEntryCorruption::missing_key(raw_key, entity_key),
667                        ),
668                    )
669                    .into());
670                };
671
672                if plan.index.unique && e.len() > 1 {
673                    return Err(ExecutorError::corruption(
674                        ErrorOrigin::Index,
675                        format!(
676                            "index corrupted: {} ({}) -> {}",
677                            E::PATH,
678                            fields,
679                            IndexEntryCorruption::NonUniqueEntry { keys: e.len() },
680                        ),
681                    )
682                    .into());
683                }
684
685                if !e.contains(&entity_key) {
686                    return Err(ExecutorError::corruption(
687                        ErrorOrigin::Index,
688                        format!(
689                            "index corrupted: {} ({}) -> {}",
690                            E::PATH,
691                            fields,
692                            IndexEntryCorruption::missing_key(raw_key, entity_key),
693                        ),
694                    )
695                    .into());
696                }
697                removed = removed.saturating_add(1);
698
699                // Remove this entity’s key from the index entry.
700                if let Some(e) = entry.as_mut() {
701                    e.remove_key(&entity_key);
702                    if e.is_empty() {
703                        *entry = None;
704                    }
705                }
706            }
707
708            // Emit commit ops for each touched index key.
709            for (raw_key, entry) in entries {
710                let value = if let Some(entry) = entry {
711                    let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| match err {
712                        crate::db::index::entry::IndexEntryEncodeError::TooManyKeys { keys } => {
713                            InternalError::new(
714                                ErrorClass::Corruption,
715                                ErrorOrigin::Index,
716                                format!(
717                                    "index corrupted: {} ({}) -> {}",
718                                    E::PATH,
719                                    fields,
720                                    IndexEntryCorruption::TooManyKeys { count: keys }
721                                ),
722                            )
723                        }
724                        crate::db::index::entry::IndexEntryEncodeError::KeyEncoding(err) => {
725                            InternalError::new(
726                                ErrorClass::Unsupported,
727                                ErrorOrigin::Index,
728                                format!(
729                                    "index key encoding failed: {} ({fields}) -> {err}",
730                                    E::PATH
731                                ),
732                            )
733                        }
734                    })?;
735                    Some(raw.as_bytes().to_vec())
736                } else {
737                    // None means the index entry is fully removed.
738                    None
739                };
740
741                ops.push(CommitIndexOp {
742                    store: plan.index.store.to_string(),
743                    key: raw_key.as_bytes().to_vec(),
744                    value,
745                });
746            }
747        }
748
749        Ok((ops, removed))
750    }
751}
752
753fn decode_rows<E: EntityKind>(rows: Vec<DataRow>) -> Result<Vec<DeleteRow<E>>, InternalError> {
754    rows.into_iter()
755        .map(|(dk, raw)| {
756            let dk_for_err = dk.clone();
757            let entity = raw.try_decode::<E>().map_err(|err| {
758                ExecutorError::corruption(
759                    ErrorOrigin::Serialize,
760                    format!("failed to deserialize row: {dk_for_err} ({err})"),
761                )
762            })?;
763
764            let expected = dk.key();
765            let actual = entity.key();
766            if expected != actual {
767                return Err(ExecutorError::corruption(
768                    ErrorOrigin::Store,
769                    format!("row key mismatch: expected {expected}, found {actual}"),
770                )
771                .into());
772            }
773
774            Ok(DeleteRow {
775                key: dk,
776                raw: Some(raw),
777                entity,
778            })
779        })
780        .collect()
781}