Skip to main content

icydb_core/db/index/
plan.rs

1use crate::{
2    db::{
3        CommitIndexOp,
4        executor::ExecutorError,
5        index::{
6            IndexEntry, IndexEntryCorruption, IndexEntryEncodeError, IndexKey, IndexStore,
7            RawIndexEntry, RawIndexKey,
8        },
9        store::DataKey,
10    },
11    error::{ErrorClass, ErrorOrigin, InternalError},
12    key::Key,
13    model::index::IndexModel,
14    obs::sink::{self, MetricsEvent},
15    traits::{EntityKind, Storable},
16};
17use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
18
19///
20/// IndexApplyPlan
21///
22
23#[derive(Debug)]
24pub struct IndexApplyPlan {
25    pub index: &'static IndexModel,
26    pub store: &'static LocalKey<RefCell<IndexStore>>,
27}
28
29///
30/// IndexMutationPlan
31///
32
33#[derive(Debug)]
34pub struct IndexMutationPlan {
35    pub apply: Vec<IndexApplyPlan>,
36    pub commit_ops: Vec<CommitIndexOp>,
37}
38
39/// Plan all index mutations for a single entity transition.
40///
41/// This function:
42/// - Loads existing index entries
43/// - Validates unique constraints
44/// - Computes the exact index writes/deletes required
45///
46/// All fallible work happens here. The returned plan is safe to apply
47/// infallibly after a commit marker is written.
48pub fn plan_index_mutation_for_entity<E: EntityKind>(
49    db: &crate::db::Db<E::Canister>,
50    old: Option<&E>,
51    new: Option<&E>,
52) -> Result<IndexMutationPlan, InternalError> {
53    let old_entity_key = old.map(EntityKind::key);
54    let new_entity_key = new.map(EntityKind::key);
55
56    let mut apply = Vec::with_capacity(E::INDEXES.len());
57    let mut commit_ops = Vec::new();
58
59    for index in E::INDEXES {
60        let store = db.with_index(|reg| reg.try_get_store(index.store))?;
61
62        let old_key = match old {
63            Some(entity) => IndexKey::new(entity, index)?,
64            None => None,
65        };
66        let new_key = match new {
67            Some(entity) => IndexKey::new(entity, index)?,
68            None => None,
69        };
70
71        let old_entry = load_existing_entry(store, index, old)?;
72        // Prevalidate membership so commit-phase mutations cannot surface corruption.
73        if let Some(old_key) = &old_key {
74            let Some(old_entity_key) = old_entity_key else {
75                return Err(InternalError::new(
76                    ErrorClass::Internal,
77                    ErrorOrigin::Index,
78                    "missing old entity key for index removal".to_string(),
79                ));
80            };
81            let entry = old_entry.as_ref().ok_or_else(|| {
82                ExecutorError::corruption(
83                    ErrorOrigin::Index,
84                    format!(
85                        "index corrupted: {} ({}) -> {}",
86                        E::PATH,
87                        index.fields.join(", "),
88                        IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
89                    ),
90                )
91            })?;
92            if index.unique && entry.len() > 1 {
93                return Err(ExecutorError::corruption(
94                    ErrorOrigin::Index,
95                    format!(
96                        "index corrupted: {} ({}) -> {}",
97                        E::PATH,
98                        index.fields.join(", "),
99                        IndexEntryCorruption::NonUniqueEntry { keys: entry.len() }
100                    ),
101                )
102                .into());
103            }
104            if !entry.contains(&old_entity_key) {
105                return Err(ExecutorError::corruption(
106                    ErrorOrigin::Index,
107                    format!(
108                        "index corrupted: {} ({}) -> {}",
109                        E::PATH,
110                        index.fields.join(", "),
111                        IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
112                    ),
113                )
114                .into());
115            }
116        }
117        let new_entry = if old_key == new_key {
118            old_entry.clone()
119        } else {
120            load_existing_entry(store, index, new)?
121        };
122
123        validate_unique_constraint::<E>(
124            db,
125            index,
126            new_entry.as_ref(),
127            new_entity_key.as_ref(),
128            new,
129        )?;
130
131        build_commit_ops_for_index::<E>(
132            &mut commit_ops,
133            index,
134            old_key,
135            new_key,
136            old_entry,
137            new_entry,
138            old_entity_key,
139            new_entity_key,
140        )?;
141
142        apply.push(IndexApplyPlan { index, store });
143    }
144
145    Ok(IndexMutationPlan { apply, commit_ops })
146}
147
148fn load_existing_entry<E: EntityKind>(
149    store: &'static LocalKey<RefCell<IndexStore>>,
150    index: &'static IndexModel,
151    entity: Option<&E>,
152) -> Result<Option<IndexEntry>, InternalError> {
153    let Some(entity) = entity else {
154        return Ok(None);
155    };
156    let Some(key) = IndexKey::new(entity, index)? else {
157        return Ok(None);
158    };
159
160    store
161        .with_borrow(|s| s.get(&key.to_raw()))
162        .map(|raw| {
163            raw.try_decode().map_err(|err| {
164                ExecutorError::corruption(
165                    ErrorOrigin::Index,
166                    format!(
167                        "index corrupted: {} ({}) -> {}",
168                        E::PATH,
169                        index.fields.join(", "),
170                        err
171                    ),
172                )
173                .into()
174            })
175        })
176        .transpose()
177}
178
179/// Validate unique index constraints against the existing index entry.
180///
181/// This detects:
182/// - Index corruption (multiple keys in a unique entry)
183/// - Uniqueness violations (conflicting key ownership)
184fn validate_unique_constraint<E: EntityKind>(
185    db: &crate::db::Db<E::Canister>,
186    index: &IndexModel,
187    entry: Option<&IndexEntry>,
188    new_key: Option<&Key>,
189    new_entity: Option<&E>,
190) -> Result<(), InternalError> {
191    if !index.unique {
192        return Ok(());
193    }
194
195    let Some(entry) = entry else {
196        return Ok(());
197    };
198
199    if entry.len() > 1 {
200        return Err(ExecutorError::corruption(
201            ErrorOrigin::Index,
202            format!(
203                "index corrupted: {} ({}) -> {} keys",
204                E::PATH,
205                index.fields.join(", "),
206                entry.len()
207            ),
208        )
209        .into());
210    }
211
212    let Some(new_key) = new_key else {
213        return Ok(());
214    };
215    if entry.contains(new_key) {
216        return Ok(());
217    }
218
219    let Some(new_entity) = new_entity else {
220        return Err(InternalError::new(
221            ErrorClass::InvariantViolation,
222            ErrorOrigin::Index,
223            "missing entity payload during unique validation".to_string(),
224        ));
225    };
226    let existing_key = entry.single_key().ok_or_else(|| {
227        ExecutorError::corruption(
228            ErrorOrigin::Index,
229            format!(
230                "index corrupted: {} ({}) -> {} keys",
231                E::PATH,
232                index.fields.join(", "),
233                entry.len()
234            ),
235        )
236    })?;
237
238    let stored = {
239        let data_key = DataKey::new::<E>(existing_key);
240        let row = db.context::<E>().read_strict(&data_key)?;
241        row.try_decode::<E>().map_err(|err| {
242            ExecutorError::corruption(
243                ErrorOrigin::Serialize,
244                format!("failed to deserialize row: {data_key} ({err})"),
245            )
246        })?
247    };
248
249    for field in index.fields {
250        let expected = new_entity.get_value(field).ok_or_else(|| {
251            InternalError::new(
252                ErrorClass::InvariantViolation,
253                ErrorOrigin::Index,
254                format!(
255                    "index field missing on lookup entity: {} ({})",
256                    E::PATH,
257                    field
258                ),
259            )
260        })?;
261        let actual = stored.get_value(field).ok_or_else(|| {
262            InternalError::new(
263                ErrorClass::InvariantViolation,
264                ErrorOrigin::Index,
265                format!(
266                    "index field missing on stored entity: {} ({})",
267                    E::PATH,
268                    field
269                ),
270            )
271        })?;
272
273        if expected != actual {
274            return Err(ExecutorError::corruption(
275                ErrorOrigin::Index,
276                format!("index hash collision: {} ({})", E::PATH, field),
277            )
278            .into());
279        }
280    }
281
282    sink::record(MetricsEvent::UniqueViolation {
283        entity_path: E::PATH,
284    });
285
286    Err(ExecutorError::index_violation(E::PATH, index.fields).into())
287}
288
289/// Compute commit-time index operations for a single index.
290///
291/// Produces a minimal set of index updates:
292/// - `Some(bytes)` → insert/update index entry
293/// - `None`        → delete index entry
294///
295/// Correctly handles old/new key overlap and guarantees that
296/// apply-time mutations cannot fail except by invariant violation.
297#[allow(clippy::too_many_arguments)]
298fn build_commit_ops_for_index<E: EntityKind>(
299    commit_ops: &mut Vec<CommitIndexOp>,
300    index: &'static IndexModel,
301    old_key: Option<IndexKey>,
302    new_key: Option<IndexKey>,
303    old_entry: Option<IndexEntry>,
304    new_entry: Option<IndexEntry>,
305    old_entity_key: Option<Key>,
306    new_entity_key: Option<Key>,
307) -> Result<(), InternalError> {
308    let mut touched: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
309    let fields = index.fields.join(", ");
310
311    // ── Removal ────────────────────────────────
312
313    if let Some(old_key) = old_key {
314        let Some(old_entity_key) = old_entity_key else {
315            return Err(InternalError::new(
316                ErrorClass::Internal,
317                ErrorOrigin::Index,
318                "missing old entity key for index removal".to_string(),
319            ));
320        };
321
322        if let Some(mut entry) = old_entry {
323            entry.remove_key(&old_entity_key);
324            let after = if entry.is_empty() { None } else { Some(entry) };
325            touched.insert(old_key.to_raw(), after);
326        } else {
327            // No existing index entry -> nothing to remove.
328            touched.insert(old_key.to_raw(), None);
329        }
330    }
331
332    // ── Insertion ──────────────────────────────
333
334    if let Some(new_key) = new_key {
335        let Some(new_entity_key) = new_entity_key else {
336            return Err(InternalError::new(
337                ErrorClass::Internal,
338                ErrorOrigin::Index,
339                "missing new entity key for index insertion".to_string(),
340            ));
341        };
342
343        let raw_key = new_key.to_raw();
344
345        // Start from:
346        //   1. result of removal (if same key)
347        //   2. existing entry loaded from store
348        //   3. brand new entry
349        let mut entry = if let Some(existing) = touched.remove(&raw_key) {
350            existing.unwrap_or_else(|| IndexEntry::new(new_entity_key))
351        } else if let Some(existing) = new_entry {
352            existing
353        } else {
354            IndexEntry::new(new_entity_key)
355        };
356
357        entry.insert_key(new_entity_key);
358        touched.insert(raw_key, Some(entry));
359    }
360
361    // ── Emit commit ops ────────────────────────
362
363    for (raw_key, entry) in touched {
364        let value = if let Some(entry) = entry {
365            let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| match err {
366                IndexEntryEncodeError::TooManyKeys { keys } => InternalError::new(
367                    ErrorClass::Unsupported,
368                    ErrorOrigin::Index,
369                    format!(
370                        "index entry exceeds max keys: {} ({}) -> {} keys",
371                        E::PATH,
372                        fields,
373                        keys
374                    ),
375                ),
376                IndexEntryEncodeError::KeyEncoding(err) => InternalError::new(
377                    ErrorClass::Unsupported,
378                    ErrorOrigin::Index,
379                    format!(
380                        "index entry key encoding failed: {} ({}) -> {err}",
381                        E::PATH,
382                        fields
383                    ),
384                ),
385            })?;
386            Some(raw.into_bytes())
387        } else {
388            None
389        };
390
391        commit_ops.push(CommitIndexOp {
392            store: index.store.to_string(),
393            key: raw_key.as_bytes().to_vec(),
394            value,
395        });
396    }
397
398    Ok(())
399}
400
401/// Load and decode the current index entry for an entity, if one exists.
402///
403/// This is a *planning-phase* helper used during index mutation prevalidation.
404/// It performs a read-only lookup of the index store and attempts to decode
405/// the raw entry into an `IndexEntry`.
406///
407/// Semantics:
408/// - Returns `Ok(None)` if:
409///   - the entity does not participate in this index, or
410///   - no index entry exists for the computed index key.
411/// - Returns `Ok(Some(IndexEntry))` if a valid entry is present.
412/// - Returns `Err` if the raw index data exists but cannot be decoded.
413///
414/// Error handling:
415/// - Decode failures are treated as *index corruption*, not user error.
416///   Such corruption must be detected *before* any commit marker is written,
417///   ensuring no partial mutations occur.
418///
419/// Atomicity rationale:
420/// This function is intentionally fallible and must only be called during the
421/// prevalidation phase. Once the commit marker is persisted, all index/data
422/// mutations are assumed infallible (or trap), so any corruption must surface
423/// here to preserve Stage-2 atomicity guarantees.
424pub fn load_existing_index_entry<E: EntityKind>(
425    store: &'static LocalKey<RefCell<IndexStore>>,
426    index: &'static IndexModel,
427    entity: Option<&E>,
428) -> Result<Option<IndexEntry>, InternalError> {
429    let Some(entity) = entity else {
430        return Ok(None);
431    };
432    let Some(key) = IndexKey::new(entity, index)? else {
433        return Ok(None);
434    };
435
436    store
437        .with_borrow(|s| s.get(&key.to_raw()))
438        .map(|raw| {
439            raw.try_decode().map_err(|err| {
440                ExecutorError::corruption(
441                    ErrorOrigin::Index,
442                    format!(
443                        "index corrupted: {} ({}) -> {}",
444                        E::PATH,
445                        index.fields.join(", "),
446                        err
447                    ),
448                )
449                .into()
450            })
451        })
452        .transpose()
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458    use crate::{
459        db::{
460            Db,
461            index::IndexStoreRegistry,
462            index::fingerprint::with_test_hash_override,
463            store::{DataStore, DataStoreRegistry, RawRow},
464        },
465        error::{ErrorClass, ErrorOrigin},
466        model::{
467            entity::EntityModel,
468            field::{EntityFieldKind, EntityFieldModel},
469            index::IndexModel,
470        },
471        serialize::serialize,
472        traits::{
473            CanisterKind, EntityKind, FieldValues, Path, SanitizeAuto, SanitizeCustom, StoreKind,
474            ValidateAuto, ValidateCustom, View, ViewError, Visitable,
475        },
476        types::Ulid,
477        value::Value,
478    };
479    use serde::{Deserialize, Serialize};
480    use std::cell::RefCell;
481
482    const CANISTER_PATH: &str = "index_plan_test::TestCanister";
483    const DATA_STORE_PATH: &str = "index_plan_test::TestDataStore";
484    const INDEX_STORE_PATH: &str = "index_plan_test::TestIndexStore";
485    const ENTITY_PATH: &str = "index_plan_test::TestEntity";
486
487    const INDEX_FIELDS: [&str; 1] = ["tag"];
488    const INDEX_MODEL: IndexModel = IndexModel::new(
489        "index_plan_test::idx_tag",
490        INDEX_STORE_PATH,
491        &INDEX_FIELDS,
492        true,
493    );
494    const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
495
496    const TEST_FIELDS: [EntityFieldModel; 2] = [
497        EntityFieldModel {
498            name: "id",
499            kind: EntityFieldKind::Ulid,
500        },
501        EntityFieldModel {
502            name: "tag",
503            kind: EntityFieldKind::Text,
504        },
505    ];
506    const TEST_MODEL: EntityModel = EntityModel {
507        path: ENTITY_PATH,
508        entity_name: "TestEntity",
509        primary_key: &TEST_FIELDS[0],
510        fields: &TEST_FIELDS,
511        indexes: &INDEXES,
512    };
513
514    #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
515    struct TestEntity {
516        id: Ulid,
517        tag: String,
518    }
519
520    impl Path for TestEntity {
521        const PATH: &'static str = ENTITY_PATH;
522    }
523
524    impl View for TestEntity {
525        type ViewType = Self;
526
527        fn to_view(&self) -> Self::ViewType {
528            self.clone()
529        }
530
531        fn from_view(view: Self::ViewType) -> Result<Self, ViewError> {
532            Ok(view)
533        }
534    }
535
536    impl SanitizeAuto for TestEntity {}
537    impl SanitizeCustom for TestEntity {}
538    impl ValidateAuto for TestEntity {}
539    impl ValidateCustom for TestEntity {}
540    impl Visitable for TestEntity {}
541
542    impl FieldValues for TestEntity {
543        fn get_value(&self, field: &str) -> Option<Value> {
544            match field {
545                "id" => Some(Value::Ulid(self.id)),
546                "tag" => Some(Value::Text(self.tag.clone())),
547                _ => None,
548            }
549        }
550    }
551
552    #[derive(Clone, Copy)]
553    struct TestCanister;
554
555    impl Path for TestCanister {
556        const PATH: &'static str = CANISTER_PATH;
557    }
558
559    impl CanisterKind for TestCanister {}
560
561    struct TestStore;
562
563    impl Path for TestStore {
564        const PATH: &'static str = DATA_STORE_PATH;
565    }
566
567    impl StoreKind for TestStore {
568        type Canister = TestCanister;
569    }
570
571    impl EntityKind for TestEntity {
572        type PrimaryKey = Ulid;
573        type Store = TestStore;
574        type Canister = TestCanister;
575
576        const ENTITY_NAME: &'static str = "TestEntity";
577        const PRIMARY_KEY: &'static str = "id";
578        const FIELDS: &'static [&'static str] = &["id", "tag"];
579        const INDEXES: &'static [&'static IndexModel] = &INDEXES;
580        const MODEL: &'static EntityModel = &TEST_MODEL;
581
582        fn key(&self) -> crate::key::Key {
583            self.id.into()
584        }
585
586        fn primary_key(&self) -> Self::PrimaryKey {
587            self.id
588        }
589
590        fn set_primary_key(&mut self, key: Self::PrimaryKey) {
591            self.id = key;
592        }
593    }
594
595    canic_memory::eager_static! {
596        static TEST_DATA_STORE: RefCell<DataStore> =
597            RefCell::new(DataStore::init(canic_memory::ic_memory!(DataStore, 20)));
598    }
599
600    canic_memory::eager_static! {
601        static TEST_INDEX_STORE: RefCell<IndexStore> =
602            RefCell::new(IndexStore::init(canic_memory::ic_memory!(IndexStore, 21)));
603    }
604
605    thread_local! {
606        static DATA_REGISTRY: DataStoreRegistry = {
607            let mut reg = DataStoreRegistry::new();
608            reg.register(DATA_STORE_PATH, &TEST_DATA_STORE);
609            reg
610        };
611
612        static INDEX_REGISTRY: IndexStoreRegistry = {
613            let mut reg = IndexStoreRegistry::new();
614            reg.register(INDEX_STORE_PATH, &TEST_INDEX_STORE);
615            reg
616        };
617    }
618
619    static DB: Db<TestCanister> = Db::new(&DATA_REGISTRY, &INDEX_REGISTRY);
620
621    canic_memory::eager_init!({
622        canic_memory::ic_memory_range!(0, 60);
623    });
624
625    fn reset_stores() {
626        TEST_DATA_STORE.with_borrow_mut(|store| store.clear());
627        TEST_INDEX_STORE.with_borrow_mut(|store| store.clear());
628    }
629
630    fn seed_entity(entity: &TestEntity) {
631        let data_key = DataKey::new::<TestEntity>(entity.id);
632        let raw_key = data_key.to_raw().expect("data key encode");
633        let raw_row = RawRow::try_new(serialize(entity).unwrap()).unwrap();
634        TEST_DATA_STORE.with_borrow_mut(|store| store.insert(raw_key, raw_row));
635
636        let index_key = IndexKey::new(entity, &INDEX_MODEL)
637            .expect("index key")
638            .expect("index key missing");
639        let raw_index_key = index_key.to_raw();
640        let entry = IndexEntry::new(entity.key());
641        let raw_entry = RawIndexEntry::try_from_entry(&entry).unwrap();
642        TEST_INDEX_STORE.with_borrow_mut(|store| store.insert(raw_index_key, raw_entry));
643    }
644
645    #[test]
646    fn unique_collision_equal_values_is_conflict() {
647        with_test_hash_override([0xAA; 16], || {
648            reset_stores();
649
650            let existing = TestEntity {
651                id: Ulid::from_u128(1),
652                tag: "alpha".to_string(),
653            };
654            seed_entity(&existing);
655
656            let incoming = TestEntity {
657                id: Ulid::from_u128(2),
658                tag: "alpha".to_string(),
659            };
660
661            let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
662                .expect_err("expected unique conflict");
663            assert_eq!(err.class, ErrorClass::Conflict);
664        });
665    }
666
667    #[test]
668    fn unique_collision_different_values_is_corruption() {
669        with_test_hash_override([0xAA; 16], || {
670            reset_stores();
671
672            let existing = TestEntity {
673                id: Ulid::from_u128(1),
674                tag: "alpha".to_string(),
675            };
676            seed_entity(&existing);
677
678            let incoming = TestEntity {
679                id: Ulid::from_u128(2),
680                tag: "beta".to_string(),
681            };
682
683            let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
684                .expect_err("expected hash collision");
685            assert_eq!(err.class, ErrorClass::Corruption);
686            assert_eq!(err.origin, ErrorOrigin::Index);
687        });
688    }
689}