Skip to main content

icydb_core/db/index/
plan.rs

1use crate::{
2    db::{
3        CommitIndexOp,
4        executor::ExecutorError,
5        index::{
6            IndexEntry, IndexEntryCorruption, IndexEntryEncodeError, IndexKey, IndexStore,
7            RawIndexEntry, RawIndexKey,
8        },
9        store::DataKey,
10    },
11    error::{ErrorClass, ErrorOrigin, InternalError},
12    key::Key,
13    model::index::IndexModel,
14    obs::sink::{self, MetricsEvent},
15    traits::{EntityKind, Storable},
16};
17use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
18
19///
20/// IndexApplyPlan
21///
22
23#[derive(Debug)]
24pub struct IndexApplyPlan {
25    pub index: &'static IndexModel,
26    pub store: &'static LocalKey<RefCell<IndexStore>>,
27}
28
29///
30/// IndexMutationPlan
31///
32
33#[derive(Debug)]
34pub struct IndexMutationPlan {
35    pub apply: Vec<IndexApplyPlan>,
36    pub commit_ops: Vec<CommitIndexOp>,
37}
38
39/// Plan all index mutations for a single entity transition.
40///
41/// This function:
42/// - Loads existing index entries
43/// - Validates unique constraints
44/// - Computes the exact index writes/deletes required
45///
46/// All fallible work happens here. The returned plan is safe to apply
47/// infallibly after a commit marker is written.
48pub fn plan_index_mutation_for_entity<E: EntityKind>(
49    db: &crate::db::Db<E::Canister>,
50    old: Option<&E>,
51    new: Option<&E>,
52) -> Result<IndexMutationPlan, InternalError> {
53    let old_entity_key = old.map(EntityKind::key);
54    let new_entity_key = new.map(EntityKind::key);
55
56    let mut apply = Vec::with_capacity(E::INDEXES.len());
57    let mut commit_ops = Vec::new();
58
59    for index in E::INDEXES {
60        let store = db.with_index(|reg| reg.try_get_store(index.store))?;
61
62        let old_key = match old {
63            Some(entity) => IndexKey::new(entity, index)?,
64            None => None,
65        };
66        let new_key = match new {
67            Some(entity) => IndexKey::new(entity, index)?,
68            None => None,
69        };
70
71        let old_entry = load_existing_entry(store, index, old)?;
72        // Prevalidate membership so commit-phase mutations cannot surface corruption.
73        if let Some(old_key) = &old_key {
74            let Some(old_entity_key) = old_entity_key else {
75                return Err(InternalError::new(
76                    ErrorClass::Internal,
77                    ErrorOrigin::Index,
78                    "missing old entity key for index removal".to_string(),
79                ));
80            };
81            let entry = old_entry.as_ref().ok_or_else(|| {
82                ExecutorError::corruption(
83                    ErrorOrigin::Index,
84                    format!(
85                        "index corrupted: {} ({}) -> {}",
86                        E::PATH,
87                        index.fields.join(", "),
88                        IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
89                    ),
90                )
91            })?;
92            if index.unique && entry.len() > 1 {
93                return Err(ExecutorError::corruption(
94                    ErrorOrigin::Index,
95                    format!(
96                        "index corrupted: {} ({}) -> {}",
97                        E::PATH,
98                        index.fields.join(", "),
99                        IndexEntryCorruption::NonUniqueEntry { keys: entry.len() }
100                    ),
101                )
102                .into());
103            }
104            if !entry.contains(&old_entity_key) {
105                return Err(ExecutorError::corruption(
106                    ErrorOrigin::Index,
107                    format!(
108                        "index corrupted: {} ({}) -> {}",
109                        E::PATH,
110                        index.fields.join(", "),
111                        IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
112                    ),
113                )
114                .into());
115            }
116        }
117        let new_entry = if old_key == new_key {
118            old_entry.clone()
119        } else {
120            load_existing_entry(store, index, new)?
121        };
122
123        validate_unique_constraint::<E>(
124            db,
125            index,
126            new_entry.as_ref(),
127            new_entity_key.as_ref(),
128            new,
129        )?;
130
131        build_commit_ops_for_index::<E>(
132            &mut commit_ops,
133            index,
134            old_key,
135            new_key,
136            old_entry,
137            new_entry,
138            old_entity_key,
139            new_entity_key,
140        )?;
141
142        apply.push(IndexApplyPlan { index, store });
143    }
144
145    Ok(IndexMutationPlan { apply, commit_ops })
146}
147
148fn load_existing_entry<E: EntityKind>(
149    store: &'static LocalKey<RefCell<IndexStore>>,
150    index: &'static IndexModel,
151    entity: Option<&E>,
152) -> Result<Option<IndexEntry>, InternalError> {
153    let Some(entity) = entity else {
154        return Ok(None);
155    };
156    let Some(key) = IndexKey::new(entity, index)? else {
157        return Ok(None);
158    };
159
160    store
161        .with_borrow(|s| s.get(&key.to_raw()))
162        .map(|raw| {
163            raw.try_decode().map_err(|err| {
164                ExecutorError::corruption(
165                    ErrorOrigin::Index,
166                    format!(
167                        "index corrupted: {} ({}) -> {}",
168                        E::PATH,
169                        index.fields.join(", "),
170                        err
171                    ),
172                )
173                .into()
174            })
175        })
176        .transpose()
177}
178
179/// Validate unique index constraints against the existing index entry.
180///
181/// This detects:
182/// - Index corruption (multiple keys in a unique entry)
183/// - Uniqueness violations (conflicting key ownership)
184#[expect(clippy::too_many_lines)]
185fn validate_unique_constraint<E: EntityKind>(
186    db: &crate::db::Db<E::Canister>,
187    index: &IndexModel,
188    entry: Option<&IndexEntry>,
189    new_key: Option<&Key>,
190    new_entity: Option<&E>,
191) -> Result<(), InternalError> {
192    if !index.unique {
193        return Ok(());
194    }
195
196    let Some(entry) = entry else {
197        return Ok(());
198    };
199
200    if entry.len() > 1 {
201        return Err(ExecutorError::corruption(
202            ErrorOrigin::Index,
203            format!(
204                "index corrupted: {} ({}) -> {} keys",
205                E::PATH,
206                index.fields.join(", "),
207                entry.len()
208            ),
209        )
210        .into());
211    }
212
213    let Some(new_key) = new_key else {
214        return Ok(());
215    };
216    if entry.contains(new_key) {
217        return Ok(());
218    }
219
220    let Some(new_entity) = new_entity else {
221        return Err(InternalError::new(
222            ErrorClass::InvariantViolation,
223            ErrorOrigin::Index,
224            "missing entity payload during unique validation".to_string(),
225        ));
226    };
227    let existing_key = entry.single_key().ok_or_else(|| {
228        ExecutorError::corruption(
229            ErrorOrigin::Index,
230            format!(
231                "index corrupted: {} ({}) -> {} keys",
232                E::PATH,
233                index.fields.join(", "),
234                entry.len()
235            ),
236        )
237    })?;
238
239    let stored = {
240        let data_key = DataKey::new::<E>(existing_key);
241        let row = db.context::<E>().read_strict(&data_key)?;
242        row.try_decode::<E>().map_err(|err| {
243            ExecutorError::corruption(
244                ErrorOrigin::Serialize,
245                format!("failed to deserialize row: {data_key} ({err})"),
246            )
247        })?
248    };
249    let stored_key = stored.key();
250    if stored_key != existing_key {
251        // Stored row decoded successfully but key mismatch indicates index/data divergence; treat as corruption.
252        return Err(ExecutorError::corruption(
253            ErrorOrigin::Store,
254            format!(
255                "index corrupted: {} ({}) -> {}",
256                E::PATH,
257                index.fields.join(", "),
258                IndexEntryCorruption::RowKeyMismatch {
259                    indexed_key: Box::new(existing_key),
260                    row_key: Box::new(stored_key),
261                }
262            ),
263        )
264        .into());
265    }
266
267    for field in index.fields {
268        let expected = new_entity.get_value(field).ok_or_else(|| {
269            InternalError::new(
270                ErrorClass::InvariantViolation,
271                ErrorOrigin::Index,
272                format!(
273                    "index field missing on lookup entity: {} ({})",
274                    E::PATH,
275                    field
276                ),
277            )
278        })?;
279        let actual = stored.get_value(field).ok_or_else(|| {
280            ExecutorError::corruption(
281                ErrorOrigin::Index,
282                format!(
283                    "index corrupted: {} ({}) -> stored entity missing field",
284                    E::PATH,
285                    field
286                ),
287            )
288        })?;
289
290        if expected != actual {
291            return Err(ExecutorError::corruption(
292                ErrorOrigin::Index,
293                format!("index hash collision: {} ({})", E::PATH, field),
294            )
295            .into());
296        }
297    }
298
299    sink::record(MetricsEvent::UniqueViolation {
300        entity_path: E::PATH,
301    });
302
303    Err(ExecutorError::index_violation(E::PATH, index.fields).into())
304}
305
306/// Compute commit-time index operations for a single index.
307///
308/// Produces a minimal set of index updates:
309/// - `Some(bytes)` → insert/update index entry
310/// - `None`        → delete index entry
311///
312/// Correctly handles old/new key overlap and guarantees that
313/// apply-time mutations cannot fail except by invariant violation.
314#[allow(clippy::too_many_arguments)]
315fn build_commit_ops_for_index<E: EntityKind>(
316    commit_ops: &mut Vec<CommitIndexOp>,
317    index: &'static IndexModel,
318    old_key: Option<IndexKey>,
319    new_key: Option<IndexKey>,
320    old_entry: Option<IndexEntry>,
321    new_entry: Option<IndexEntry>,
322    old_entity_key: Option<Key>,
323    new_entity_key: Option<Key>,
324) -> Result<(), InternalError> {
325    let mut touched: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
326    let fields = index.fields.join(", ");
327
328    // ── Removal ────────────────────────────────
329
330    if let Some(old_key) = old_key {
331        let Some(old_entity_key) = old_entity_key else {
332            return Err(InternalError::new(
333                ErrorClass::Internal,
334                ErrorOrigin::Index,
335                "missing old entity key for index removal".to_string(),
336            ));
337        };
338
339        if let Some(mut entry) = old_entry {
340            entry.remove_key(&old_entity_key);
341            let after = if entry.is_empty() { None } else { Some(entry) };
342            touched.insert(old_key.to_raw(), after);
343        } else {
344            // No existing index entry -> nothing to remove.
345            touched.insert(old_key.to_raw(), None);
346        }
347    }
348
349    // ── Insertion ──────────────────────────────
350
351    if let Some(new_key) = new_key {
352        let Some(new_entity_key) = new_entity_key else {
353            return Err(InternalError::new(
354                ErrorClass::Internal,
355                ErrorOrigin::Index,
356                "missing new entity key for index insertion".to_string(),
357            ));
358        };
359
360        let raw_key = new_key.to_raw();
361
362        // Start from:
363        //   1. result of removal (if same key)
364        //   2. existing entry loaded from store
365        //   3. brand new entry
366        let mut entry = if let Some(existing) = touched.remove(&raw_key) {
367            existing.unwrap_or_else(|| IndexEntry::new(new_entity_key))
368        } else if let Some(existing) = new_entry {
369            existing
370        } else {
371            IndexEntry::new(new_entity_key)
372        };
373
374        entry.insert_key(new_entity_key);
375        touched.insert(raw_key, Some(entry));
376    }
377
378    // ── Emit commit ops ────────────────────────
379
380    for (raw_key, entry) in touched {
381        let value = if let Some(entry) = entry {
382            let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| match err {
383                IndexEntryEncodeError::TooManyKeys { keys } => InternalError::new(
384                    ErrorClass::Unsupported,
385                    ErrorOrigin::Index,
386                    format!(
387                        "index entry exceeds max keys: {} ({}) -> {} keys",
388                        E::PATH,
389                        fields,
390                        keys
391                    ),
392                ),
393                IndexEntryEncodeError::KeyEncoding(err) => InternalError::new(
394                    ErrorClass::Unsupported,
395                    ErrorOrigin::Index,
396                    format!(
397                        "index entry key encoding failed: {} ({}) -> {err}",
398                        E::PATH,
399                        fields
400                    ),
401                ),
402            })?;
403            Some(raw.into_bytes())
404        } else {
405            None
406        };
407
408        commit_ops.push(CommitIndexOp {
409            store: index.store.to_string(),
410            key: raw_key.as_bytes().to_vec(),
411            value,
412        });
413    }
414
415    Ok(())
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use crate::{
422        db::{
423            Db,
424            index::IndexStoreRegistry,
425            index::fingerprint::with_test_hash_override,
426            store::{DataStore, DataStoreRegistry, RawRow},
427        },
428        error::{ErrorClass, ErrorOrigin},
429        model::{
430            entity::EntityModel,
431            field::{EntityFieldKind, EntityFieldModel},
432            index::IndexModel,
433        },
434        serialize::serialize,
435        traits::{
436            CanisterKind, DataStoreKind, EntityKind, FieldValues, Path, SanitizeAuto,
437            SanitizeCustom, ValidateAuto, ValidateCustom, View, Visitable,
438        },
439        types::Ulid,
440        value::Value,
441    };
442    use serde::{Deserialize, Serialize};
443    use std::cell::RefCell;
444
445    const CANISTER_PATH: &str = "index_plan_test::TestCanister";
446    const DATA_STORE_PATH: &str = "index_plan_test::TestDataStore";
447    const INDEX_STORE_PATH: &str = "index_plan_test::TestIndexStore";
448    const ENTITY_PATH: &str = "index_plan_test::TestEntity";
449
450    const INDEX_FIELDS: [&str; 1] = ["tag"];
451    const INDEX_MODEL: IndexModel = IndexModel::new(
452        "index_plan_test::idx_tag",
453        INDEX_STORE_PATH,
454        &INDEX_FIELDS,
455        true,
456    );
457    const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
458
459    const TEST_FIELDS: [EntityFieldModel; 2] = [
460        EntityFieldModel {
461            name: "id",
462            kind: EntityFieldKind::Ulid,
463        },
464        EntityFieldModel {
465            name: "tag",
466            kind: EntityFieldKind::Text,
467        },
468    ];
469    const TEST_MODEL: EntityModel = EntityModel {
470        path: ENTITY_PATH,
471        entity_name: "TestEntity",
472        primary_key: &TEST_FIELDS[0],
473        fields: &TEST_FIELDS,
474        indexes: &INDEXES,
475    };
476    const MISSING_ENTITY_PATH: &str = "index_plan_test::MissingFieldEntity";
477    const MISSING_MODEL: EntityModel = EntityModel {
478        path: MISSING_ENTITY_PATH,
479        entity_name: "MissingFieldEntity",
480        primary_key: &TEST_FIELDS[0],
481        fields: &TEST_FIELDS,
482        indexes: &INDEXES,
483    };
484
485    #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
486    struct TestEntity {
487        id: Ulid,
488        tag: String,
489    }
490
491    impl Path for TestEntity {
492        const PATH: &'static str = ENTITY_PATH;
493    }
494
495    impl View for TestEntity {
496        type ViewType = Self;
497
498        fn to_view(&self) -> Self::ViewType {
499            self.clone()
500        }
501
502        fn from_view(view: Self::ViewType) -> Self {
503            view
504        }
505    }
506
507    impl SanitizeAuto for TestEntity {}
508    impl SanitizeCustom for TestEntity {}
509    impl ValidateAuto for TestEntity {}
510    impl ValidateCustom for TestEntity {}
511    impl Visitable for TestEntity {}
512
513    impl FieldValues for TestEntity {
514        fn get_value(&self, field: &str) -> Option<Value> {
515            match field {
516                "id" => Some(Value::Ulid(self.id)),
517                "tag" => Some(Value::Text(self.tag.clone())),
518                _ => None,
519            }
520        }
521    }
522
523    #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
524    struct MissingFieldEntity {
525        id: Ulid,
526        tag: String,
527    }
528
529    impl Path for MissingFieldEntity {
530        const PATH: &'static str = MISSING_ENTITY_PATH;
531    }
532
533    impl View for MissingFieldEntity {
534        type ViewType = Self;
535
536        fn to_view(&self) -> Self::ViewType {
537            self.clone()
538        }
539
540        fn from_view(view: Self::ViewType) -> Self {
541            view
542        }
543    }
544
545    impl SanitizeAuto for MissingFieldEntity {}
546    impl SanitizeCustom for MissingFieldEntity {}
547    impl ValidateAuto for MissingFieldEntity {}
548    impl ValidateCustom for MissingFieldEntity {}
549    impl Visitable for MissingFieldEntity {}
550
551    impl FieldValues for MissingFieldEntity {
552        fn get_value(&self, field: &str) -> Option<Value> {
553            match field {
554                "id" => Some(Value::Ulid(self.id)),
555                "tag" if self.tag == "__missing__" => None,
556                "tag" => Some(Value::Text(self.tag.clone())),
557                _ => None,
558            }
559        }
560    }
561
562    #[derive(Clone, Copy)]
563    struct TestCanister;
564
565    impl Path for TestCanister {
566        const PATH: &'static str = CANISTER_PATH;
567    }
568
569    impl CanisterKind for TestCanister {}
570
571    struct TestStore;
572
573    impl Path for TestStore {
574        const PATH: &'static str = DATA_STORE_PATH;
575    }
576
577    impl DataStoreKind for TestStore {
578        type Canister = TestCanister;
579    }
580
581    impl EntityKind for TestEntity {
582        type PrimaryKey = Ulid;
583        type DataStore = TestStore;
584        type Canister = TestCanister;
585
586        const ENTITY_NAME: &'static str = "TestEntity";
587        const PRIMARY_KEY: &'static str = "id";
588        const FIELDS: &'static [&'static str] = &["id", "tag"];
589        const INDEXES: &'static [&'static IndexModel] = &INDEXES;
590        const MODEL: &'static EntityModel = &TEST_MODEL;
591
592        fn key(&self) -> crate::key::Key {
593            self.id.into()
594        }
595
596        fn primary_key(&self) -> Self::PrimaryKey {
597            self.id
598        }
599
600        fn set_primary_key(&mut self, key: Self::PrimaryKey) {
601            self.id = key;
602        }
603    }
604
605    impl EntityKind for MissingFieldEntity {
606        type PrimaryKey = Ulid;
607        type DataStore = TestStore;
608        type Canister = TestCanister;
609
610        const ENTITY_NAME: &'static str = "MissingFieldEntity";
611        const PRIMARY_KEY: &'static str = "id";
612        const FIELDS: &'static [&'static str] = &["id", "tag"];
613        const INDEXES: &'static [&'static IndexModel] = &INDEXES;
614        const MODEL: &'static EntityModel = &MISSING_MODEL;
615
616        fn key(&self) -> crate::key::Key {
617            self.id.into()
618        }
619
620        fn primary_key(&self) -> Self::PrimaryKey {
621            self.id
622        }
623
624        fn set_primary_key(&mut self, key: Self::PrimaryKey) {
625            self.id = key;
626        }
627    }
628
629    canic_memory::eager_static! {
630        static TEST_DATA_STORE: RefCell<DataStore> =
631            RefCell::new(DataStore::init(canic_memory::ic_memory!(DataStore, 20)));
632    }
633
634    canic_memory::eager_static! {
635        static TEST_INDEX_STORE: RefCell<IndexStore> =
636            RefCell::new(IndexStore::init(
637                canic_memory::ic_memory!(IndexStore, 21),
638                canic_memory::ic_memory!(IndexStore, 22),
639            ));
640    }
641
642    thread_local! {
643        static DATA_REGISTRY: DataStoreRegistry = {
644            let mut reg = DataStoreRegistry::new();
645            reg.register(DATA_STORE_PATH, &TEST_DATA_STORE);
646            reg
647        };
648
649        static INDEX_REGISTRY: IndexStoreRegistry = {
650            let mut reg = IndexStoreRegistry::new();
651            reg.register(INDEX_STORE_PATH, &TEST_INDEX_STORE);
652            reg
653        };
654    }
655
656    static DB: Db<TestCanister> = Db::new(&DATA_REGISTRY, &INDEX_REGISTRY, &[]);
657
658    canic_memory::eager_init!({
659        canic_memory::ic_memory_range!(0, 60);
660    });
661
662    fn reset_stores() {
663        TEST_DATA_STORE.with_borrow_mut(DataStore::clear);
664        TEST_INDEX_STORE.with_borrow_mut(IndexStore::clear);
665    }
666
667    fn seed_entity(entity: &TestEntity) {
668        let data_key = DataKey::new::<TestEntity>(entity.id);
669        let raw_key = data_key.to_raw().expect("data key encode");
670        let raw_row = RawRow::try_new(serialize(entity).unwrap()).unwrap();
671        TEST_DATA_STORE.with_borrow_mut(|store| store.insert(raw_key, raw_row));
672
673        let index_key = IndexKey::new(entity, &INDEX_MODEL)
674            .expect("index key")
675            .expect("index key missing");
676        let raw_index_key = index_key.to_raw();
677        let entry = IndexEntry::new(entity.key());
678        let raw_entry = RawIndexEntry::try_from_entry(&entry).unwrap();
679        TEST_INDEX_STORE.with_borrow_mut(|store| store.insert(raw_index_key, raw_entry));
680    }
681
682    #[test]
683    fn unique_collision_equal_values_is_conflict() {
684        with_test_hash_override([0xAA; 16], || {
685            reset_stores();
686
687            let existing = TestEntity {
688                id: Ulid::from_u128(1),
689                tag: "alpha".to_string(),
690            };
691            seed_entity(&existing);
692
693            let incoming = TestEntity {
694                id: Ulid::from_u128(2),
695                tag: "alpha".to_string(),
696            };
697
698            let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
699                .expect_err("expected unique conflict");
700            assert_eq!(err.class, ErrorClass::Conflict);
701        });
702    }
703
704    #[test]
705    fn unique_collision_different_values_is_corruption() {
706        with_test_hash_override([0xAA; 16], || {
707            reset_stores();
708
709            let existing = TestEntity {
710                id: Ulid::from_u128(1),
711                tag: "alpha".to_string(),
712            };
713            seed_entity(&existing);
714
715            let incoming = TestEntity {
716                id: Ulid::from_u128(2),
717                tag: "beta".to_string(),
718            };
719
720            let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
721                .expect_err("expected hash collision");
722            assert_eq!(err.class, ErrorClass::Corruption);
723            assert_eq!(err.origin, ErrorOrigin::Index);
724        });
725    }
726
727    #[test]
728    fn unique_collision_row_key_mismatch_is_corruption() {
729        reset_stores();
730
731        let indexed = TestEntity {
732            id: Ulid::from_u128(1),
733            tag: "alpha".to_string(),
734        };
735        let corrupted = TestEntity {
736            id: Ulid::from_u128(2),
737            tag: "alpha".to_string(),
738        };
739
740        let data_key = DataKey::new::<TestEntity>(indexed.id);
741        let raw_key = data_key.to_raw().expect("data key encode");
742        let raw_row = RawRow::try_new(serialize(&corrupted).unwrap()).unwrap();
743        TEST_DATA_STORE.with_borrow_mut(|store| store.insert(raw_key, raw_row));
744
745        let index_key = IndexKey::new(&indexed, &INDEX_MODEL)
746            .expect("index key")
747            .expect("index key missing");
748        let raw_index_key = index_key.to_raw();
749        let entry = IndexEntry::new(indexed.key());
750        let raw_entry = RawIndexEntry::try_from_entry(&entry).unwrap();
751        TEST_INDEX_STORE.with_borrow_mut(|store| store.insert(raw_index_key, raw_entry));
752
753        let incoming = TestEntity {
754            id: Ulid::from_u128(3),
755            tag: "alpha".to_string(),
756        };
757
758        let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
759            .expect_err("expected row key mismatch corruption");
760        assert_eq!(err.class, ErrorClass::Corruption);
761        assert_eq!(err.origin, ErrorOrigin::Store);
762    }
763
764    #[test]
765    fn unique_collision_stored_missing_field_is_corruption() {
766        reset_stores();
767
768        let stored = MissingFieldEntity {
769            id: Ulid::from_u128(1),
770            tag: "__missing__".to_string(),
771        };
772        let data_key = DataKey::new::<MissingFieldEntity>(stored.id);
773        let raw_key = data_key.to_raw().expect("data key encode");
774        let raw_row = RawRow::try_new(serialize(&stored).unwrap()).unwrap();
775        TEST_DATA_STORE.with_borrow_mut(|store| store.insert(raw_key, raw_row));
776
777        let incoming = MissingFieldEntity {
778            id: Ulid::from_u128(2),
779            tag: "alpha".to_string(),
780        };
781
782        let index_key = IndexKey::new(&incoming, &INDEX_MODEL)
783            .expect("index key")
784            .expect("index key missing");
785        let raw_index_key = index_key.to_raw();
786        let entry = IndexEntry::new(stored.key());
787        let raw_entry = RawIndexEntry::try_from_entry(&entry).unwrap();
788        TEST_INDEX_STORE.with_borrow_mut(|store| store.insert(raw_index_key, raw_entry));
789
790        let err = plan_index_mutation_for_entity::<MissingFieldEntity>(&DB, None, Some(&incoming))
791            .expect_err("expected missing stored field corruption");
792        assert_eq!(err.class, ErrorClass::Corruption);
793        assert_eq!(err.origin, ErrorOrigin::Index);
794    }
795}