Skip to main content

icydb_core/db/index/
plan.rs

1use crate::{
2    db::{
3        CommitIndexOp,
4        executor::ExecutorError,
5        index::{
6            IndexEntry, IndexEntryCorruption, IndexEntryEncodeError, IndexKey, IndexStore,
7            RawIndexEntry, RawIndexKey,
8        },
9        store::DataKey,
10    },
11    error::{ErrorClass, ErrorOrigin, InternalError},
12    key::Key,
13    model::index::IndexModel,
14    obs::sink::{self, MetricsEvent},
15    traits::{EntityKind, Storable},
16};
17use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
18
19///
20/// IndexApplyPlan
21///
22
23#[derive(Debug)]
24pub struct IndexApplyPlan {
25    pub index: &'static IndexModel,
26    pub store: &'static LocalKey<RefCell<IndexStore>>,
27}
28
29///
30/// IndexMutationPlan
31///
32
33#[derive(Debug)]
34pub struct IndexMutationPlan {
35    pub apply: Vec<IndexApplyPlan>,
36    pub commit_ops: Vec<CommitIndexOp>,
37}
38
39/// Plan all index mutations for a single entity transition.
40///
41/// This function:
42/// - Loads existing index entries
43/// - Validates unique constraints
44/// - Computes the exact index writes/deletes required
45///
46/// All fallible work happens here. The returned plan is safe to apply
47/// infallibly after a commit marker is written.
48pub fn plan_index_mutation_for_entity<E: EntityKind>(
49    db: &crate::db::Db<E::Canister>,
50    old: Option<&E>,
51    new: Option<&E>,
52) -> Result<IndexMutationPlan, InternalError> {
53    let old_entity_key = old.map(EntityKind::key);
54    let new_entity_key = new.map(EntityKind::key);
55
56    let mut apply = Vec::with_capacity(E::INDEXES.len());
57    let mut commit_ops = Vec::new();
58
59    for index in E::INDEXES {
60        let store = db.with_index(|reg| reg.try_get_store(index.store))?;
61
62        let old_key = match old {
63            Some(entity) => IndexKey::new(entity, index)?,
64            None => None,
65        };
66        let new_key = match new {
67            Some(entity) => IndexKey::new(entity, index)?,
68            None => None,
69        };
70
71        let old_entry = load_existing_entry(store, index, old)?;
72        // Prevalidate membership so commit-phase mutations cannot surface corruption.
73        if let Some(old_key) = &old_key {
74            let Some(old_entity_key) = old_entity_key else {
75                return Err(InternalError::new(
76                    ErrorClass::Internal,
77                    ErrorOrigin::Index,
78                    "missing old entity key for index removal".to_string(),
79                ));
80            };
81            let entry = old_entry.as_ref().ok_or_else(|| {
82                ExecutorError::corruption(
83                    ErrorOrigin::Index,
84                    format!(
85                        "index corrupted: {} ({}) -> {}",
86                        E::PATH,
87                        index.fields.join(", "),
88                        IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
89                    ),
90                )
91            })?;
92            if index.unique && entry.len() > 1 {
93                return Err(ExecutorError::corruption(
94                    ErrorOrigin::Index,
95                    format!(
96                        "index corrupted: {} ({}) -> {}",
97                        E::PATH,
98                        index.fields.join(", "),
99                        IndexEntryCorruption::NonUniqueEntry { keys: entry.len() }
100                    ),
101                )
102                .into());
103            }
104            if !entry.contains(&old_entity_key) {
105                return Err(ExecutorError::corruption(
106                    ErrorOrigin::Index,
107                    format!(
108                        "index corrupted: {} ({}) -> {}",
109                        E::PATH,
110                        index.fields.join(", "),
111                        IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
112                    ),
113                )
114                .into());
115            }
116        }
117        let new_entry = if old_key == new_key {
118            old_entry.clone()
119        } else {
120            load_existing_entry(store, index, new)?
121        };
122
123        validate_unique_constraint::<E>(
124            db,
125            index,
126            new_entry.as_ref(),
127            new_entity_key.as_ref(),
128            new,
129        )?;
130
131        build_commit_ops_for_index::<E>(
132            &mut commit_ops,
133            index,
134            old_key,
135            new_key,
136            old_entry,
137            new_entry,
138            old_entity_key,
139            new_entity_key,
140        )?;
141
142        apply.push(IndexApplyPlan { index, store });
143    }
144
145    Ok(IndexMutationPlan { apply, commit_ops })
146}
147
148fn load_existing_entry<E: EntityKind>(
149    store: &'static LocalKey<RefCell<IndexStore>>,
150    index: &'static IndexModel,
151    entity: Option<&E>,
152) -> Result<Option<IndexEntry>, InternalError> {
153    let Some(entity) = entity else {
154        return Ok(None);
155    };
156    let Some(key) = IndexKey::new(entity, index)? else {
157        return Ok(None);
158    };
159
160    store
161        .with_borrow(|s| s.get(&key.to_raw()))
162        .map(|raw| {
163            raw.try_decode().map_err(|err| {
164                ExecutorError::corruption(
165                    ErrorOrigin::Index,
166                    format!(
167                        "index corrupted: {} ({}) -> {}",
168                        E::PATH,
169                        index.fields.join(", "),
170                        err
171                    ),
172                )
173                .into()
174            })
175        })
176        .transpose()
177}
178
179/// Validate unique index constraints against the existing index entry.
180///
181/// This detects:
182/// - Index corruption (multiple keys in a unique entry)
183/// - Uniqueness violations (conflicting key ownership)
184#[expect(clippy::too_many_lines)]
185fn validate_unique_constraint<E: EntityKind>(
186    db: &crate::db::Db<E::Canister>,
187    index: &IndexModel,
188    entry: Option<&IndexEntry>,
189    new_key: Option<&Key>,
190    new_entity: Option<&E>,
191) -> Result<(), InternalError> {
192    if !index.unique {
193        return Ok(());
194    }
195
196    let Some(entry) = entry else {
197        return Ok(());
198    };
199
200    if entry.len() > 1 {
201        return Err(ExecutorError::corruption(
202            ErrorOrigin::Index,
203            format!(
204                "index corrupted: {} ({}) -> {} keys",
205                E::PATH,
206                index.fields.join(", "),
207                entry.len()
208            ),
209        )
210        .into());
211    }
212
213    let Some(new_key) = new_key else {
214        return Ok(());
215    };
216    if entry.contains(new_key) {
217        return Ok(());
218    }
219
220    let Some(new_entity) = new_entity else {
221        return Err(InternalError::new(
222            ErrorClass::InvariantViolation,
223            ErrorOrigin::Index,
224            "missing entity payload during unique validation".to_string(),
225        ));
226    };
227    let existing_key = entry.single_key().ok_or_else(|| {
228        ExecutorError::corruption(
229            ErrorOrigin::Index,
230            format!(
231                "index corrupted: {} ({}) -> {} keys",
232                E::PATH,
233                index.fields.join(", "),
234                entry.len()
235            ),
236        )
237    })?;
238
239    let stored = {
240        let data_key = DataKey::new::<E>(existing_key);
241        let row = db.context::<E>().read_strict(&data_key)?;
242        row.try_decode::<E>().map_err(|err| {
243            ExecutorError::corruption(
244                ErrorOrigin::Serialize,
245                format!("failed to deserialize row: {data_key} ({err})"),
246            )
247        })?
248    };
249    let stored_key = stored.key();
250    if stored_key != existing_key {
251        // Stored row decoded successfully but key mismatch indicates index/data divergence; treat as corruption.
252        return Err(ExecutorError::corruption(
253            ErrorOrigin::Index,
254            format!(
255                "index corrupted: {} ({}) -> {}",
256                E::PATH,
257                index.fields.join(", "),
258                IndexEntryCorruption::RowKeyMismatch {
259                    indexed_key: Box::new(existing_key),
260                    row_key: Box::new(stored_key),
261                }
262            ),
263        )
264        .into());
265    }
266
267    for field in index.fields {
268        let expected = new_entity.get_value(field).ok_or_else(|| {
269            InternalError::new(
270                ErrorClass::InvariantViolation,
271                ErrorOrigin::Index,
272                format!(
273                    "index field missing on lookup entity: {} ({})",
274                    E::PATH,
275                    field
276                ),
277            )
278        })?;
279        let actual = stored.get_value(field).ok_or_else(|| {
280            InternalError::new(
281                ErrorClass::InvariantViolation,
282                ErrorOrigin::Index,
283                format!(
284                    "index field missing on stored entity: {} ({})",
285                    E::PATH,
286                    field
287                ),
288            )
289        })?;
290
291        if expected != actual {
292            return Err(ExecutorError::corruption(
293                ErrorOrigin::Index,
294                format!("index hash collision: {} ({})", E::PATH, field),
295            )
296            .into());
297        }
298    }
299
300    sink::record(MetricsEvent::UniqueViolation {
301        entity_path: E::PATH,
302    });
303
304    Err(ExecutorError::index_violation(E::PATH, index.fields).into())
305}
306
307/// Compute commit-time index operations for a single index.
308///
309/// Produces a minimal set of index updates:
310/// - `Some(bytes)` → insert/update index entry
311/// - `None`        → delete index entry
312///
313/// Correctly handles old/new key overlap and guarantees that
314/// apply-time mutations cannot fail except by invariant violation.
315#[allow(clippy::too_many_arguments)]
316fn build_commit_ops_for_index<E: EntityKind>(
317    commit_ops: &mut Vec<CommitIndexOp>,
318    index: &'static IndexModel,
319    old_key: Option<IndexKey>,
320    new_key: Option<IndexKey>,
321    old_entry: Option<IndexEntry>,
322    new_entry: Option<IndexEntry>,
323    old_entity_key: Option<Key>,
324    new_entity_key: Option<Key>,
325) -> Result<(), InternalError> {
326    let mut touched: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
327    let fields = index.fields.join(", ");
328
329    // ── Removal ────────────────────────────────
330
331    if let Some(old_key) = old_key {
332        let Some(old_entity_key) = old_entity_key else {
333            return Err(InternalError::new(
334                ErrorClass::Internal,
335                ErrorOrigin::Index,
336                "missing old entity key for index removal".to_string(),
337            ));
338        };
339
340        if let Some(mut entry) = old_entry {
341            entry.remove_key(&old_entity_key);
342            let after = if entry.is_empty() { None } else { Some(entry) };
343            touched.insert(old_key.to_raw(), after);
344        } else {
345            // No existing index entry -> nothing to remove.
346            touched.insert(old_key.to_raw(), None);
347        }
348    }
349
350    // ── Insertion ──────────────────────────────
351
352    if let Some(new_key) = new_key {
353        let Some(new_entity_key) = new_entity_key else {
354            return Err(InternalError::new(
355                ErrorClass::Internal,
356                ErrorOrigin::Index,
357                "missing new entity key for index insertion".to_string(),
358            ));
359        };
360
361        let raw_key = new_key.to_raw();
362
363        // Start from:
364        //   1. result of removal (if same key)
365        //   2. existing entry loaded from store
366        //   3. brand new entry
367        let mut entry = if let Some(existing) = touched.remove(&raw_key) {
368            existing.unwrap_or_else(|| IndexEntry::new(new_entity_key))
369        } else if let Some(existing) = new_entry {
370            existing
371        } else {
372            IndexEntry::new(new_entity_key)
373        };
374
375        entry.insert_key(new_entity_key);
376        touched.insert(raw_key, Some(entry));
377    }
378
379    // ── Emit commit ops ────────────────────────
380
381    for (raw_key, entry) in touched {
382        let value = if let Some(entry) = entry {
383            let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| match err {
384                IndexEntryEncodeError::TooManyKeys { keys } => InternalError::new(
385                    ErrorClass::Unsupported,
386                    ErrorOrigin::Index,
387                    format!(
388                        "index entry exceeds max keys: {} ({}) -> {} keys",
389                        E::PATH,
390                        fields,
391                        keys
392                    ),
393                ),
394                IndexEntryEncodeError::KeyEncoding(err) => InternalError::new(
395                    ErrorClass::Unsupported,
396                    ErrorOrigin::Index,
397                    format!(
398                        "index entry key encoding failed: {} ({}) -> {err}",
399                        E::PATH,
400                        fields
401                    ),
402                ),
403            })?;
404            Some(raw.into_bytes())
405        } else {
406            None
407        };
408
409        commit_ops.push(CommitIndexOp {
410            store: index.store.to_string(),
411            key: raw_key.as_bytes().to_vec(),
412            value,
413        });
414    }
415
416    Ok(())
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422    use crate::{
423        db::{
424            Db,
425            index::IndexStoreRegistry,
426            index::fingerprint::with_test_hash_override,
427            store::{DataStore, DataStoreRegistry, RawRow},
428        },
429        error::{ErrorClass, ErrorOrigin},
430        model::{
431            entity::EntityModel,
432            field::{EntityFieldKind, EntityFieldModel},
433            index::IndexModel,
434        },
435        serialize::serialize,
436        traits::{
437            CanisterKind, EntityKind, FieldValues, Path, SanitizeAuto, SanitizeCustom, StoreKind,
438            ValidateAuto, ValidateCustom, View, Visitable,
439        },
440        types::Ulid,
441        value::Value,
442    };
443    use serde::{Deserialize, Serialize};
444    use std::cell::RefCell;
445
446    const CANISTER_PATH: &str = "index_plan_test::TestCanister";
447    const DATA_STORE_PATH: &str = "index_plan_test::TestDataStore";
448    const INDEX_STORE_PATH: &str = "index_plan_test::TestIndexStore";
449    const ENTITY_PATH: &str = "index_plan_test::TestEntity";
450
451    const INDEX_FIELDS: [&str; 1] = ["tag"];
452    const INDEX_MODEL: IndexModel = IndexModel::new(
453        "index_plan_test::idx_tag",
454        INDEX_STORE_PATH,
455        &INDEX_FIELDS,
456        true,
457    );
458    const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
459
460    const TEST_FIELDS: [EntityFieldModel; 2] = [
461        EntityFieldModel {
462            name: "id",
463            kind: EntityFieldKind::Ulid,
464        },
465        EntityFieldModel {
466            name: "tag",
467            kind: EntityFieldKind::Text,
468        },
469    ];
470    const TEST_MODEL: EntityModel = EntityModel {
471        path: ENTITY_PATH,
472        entity_name: "TestEntity",
473        primary_key: &TEST_FIELDS[0],
474        fields: &TEST_FIELDS,
475        indexes: &INDEXES,
476    };
477
478    #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
479    struct TestEntity {
480        id: Ulid,
481        tag: String,
482    }
483
484    impl Path for TestEntity {
485        const PATH: &'static str = ENTITY_PATH;
486    }
487
488    impl View for TestEntity {
489        type ViewType = Self;
490
491        fn to_view(&self) -> Self::ViewType {
492            self.clone()
493        }
494
495        fn from_view(view: Self::ViewType) -> Self {
496            view
497        }
498    }
499
500    impl SanitizeAuto for TestEntity {}
501    impl SanitizeCustom for TestEntity {}
502    impl ValidateAuto for TestEntity {}
503    impl ValidateCustom for TestEntity {}
504    impl Visitable for TestEntity {}
505
506    impl FieldValues for TestEntity {
507        fn get_value(&self, field: &str) -> Option<Value> {
508            match field {
509                "id" => Some(Value::Ulid(self.id)),
510                "tag" => Some(Value::Text(self.tag.clone())),
511                _ => None,
512            }
513        }
514    }
515
516    #[derive(Clone, Copy)]
517    struct TestCanister;
518
519    impl Path for TestCanister {
520        const PATH: &'static str = CANISTER_PATH;
521    }
522
523    impl CanisterKind for TestCanister {}
524
525    struct TestStore;
526
527    impl Path for TestStore {
528        const PATH: &'static str = DATA_STORE_PATH;
529    }
530
531    impl StoreKind for TestStore {
532        type Canister = TestCanister;
533    }
534
535    impl EntityKind for TestEntity {
536        type PrimaryKey = Ulid;
537        type Store = TestStore;
538        type Canister = TestCanister;
539
540        const ENTITY_NAME: &'static str = "TestEntity";
541        const PRIMARY_KEY: &'static str = "id";
542        const FIELDS: &'static [&'static str] = &["id", "tag"];
543        const INDEXES: &'static [&'static IndexModel] = &INDEXES;
544        const MODEL: &'static EntityModel = &TEST_MODEL;
545
546        fn key(&self) -> crate::key::Key {
547            self.id.into()
548        }
549
550        fn primary_key(&self) -> Self::PrimaryKey {
551            self.id
552        }
553
554        fn set_primary_key(&mut self, key: Self::PrimaryKey) {
555            self.id = key;
556        }
557    }
558
559    canic_memory::eager_static! {
560        static TEST_DATA_STORE: RefCell<DataStore> =
561            RefCell::new(DataStore::init(canic_memory::ic_memory!(DataStore, 20)));
562    }
563
564    canic_memory::eager_static! {
565        static TEST_INDEX_STORE: RefCell<IndexStore> =
566            RefCell::new(IndexStore::init(canic_memory::ic_memory!(IndexStore, 21)));
567    }
568
569    thread_local! {
570        static DATA_REGISTRY: DataStoreRegistry = {
571            let mut reg = DataStoreRegistry::new();
572            reg.register(DATA_STORE_PATH, &TEST_DATA_STORE);
573            reg
574        };
575
576        static INDEX_REGISTRY: IndexStoreRegistry = {
577            let mut reg = IndexStoreRegistry::new();
578            reg.register(INDEX_STORE_PATH, &TEST_INDEX_STORE);
579            reg
580        };
581    }
582
583    static DB: Db<TestCanister> = Db::new(&DATA_REGISTRY, &INDEX_REGISTRY);
584
585    canic_memory::eager_init!({
586        canic_memory::ic_memory_range!(0, 60);
587    });
588
589    fn reset_stores() {
590        TEST_DATA_STORE.with_borrow_mut(|store| store.clear());
591        TEST_INDEX_STORE.with_borrow_mut(|store| store.clear());
592    }
593
594    fn seed_entity(entity: &TestEntity) {
595        let data_key = DataKey::new::<TestEntity>(entity.id);
596        let raw_key = data_key.to_raw().expect("data key encode");
597        let raw_row = RawRow::try_new(serialize(entity).unwrap()).unwrap();
598        TEST_DATA_STORE.with_borrow_mut(|store| store.insert(raw_key, raw_row));
599
600        let index_key = IndexKey::new(entity, &INDEX_MODEL)
601            .expect("index key")
602            .expect("index key missing");
603        let raw_index_key = index_key.to_raw();
604        let entry = IndexEntry::new(entity.key());
605        let raw_entry = RawIndexEntry::try_from_entry(&entry).unwrap();
606        TEST_INDEX_STORE.with_borrow_mut(|store| store.insert(raw_index_key, raw_entry));
607    }
608
609    #[test]
610    fn unique_collision_equal_values_is_conflict() {
611        with_test_hash_override([0xAA; 16], || {
612            reset_stores();
613
614            let existing = TestEntity {
615                id: Ulid::from_u128(1),
616                tag: "alpha".to_string(),
617            };
618            seed_entity(&existing);
619
620            let incoming = TestEntity {
621                id: Ulid::from_u128(2),
622                tag: "alpha".to_string(),
623            };
624
625            let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
626                .expect_err("expected unique conflict");
627            assert_eq!(err.class, ErrorClass::Conflict);
628        });
629    }
630
631    #[test]
632    fn unique_collision_different_values_is_corruption() {
633        with_test_hash_override([0xAA; 16], || {
634            reset_stores();
635
636            let existing = TestEntity {
637                id: Ulid::from_u128(1),
638                tag: "alpha".to_string(),
639            };
640            seed_entity(&existing);
641
642            let incoming = TestEntity {
643                id: Ulid::from_u128(2),
644                tag: "beta".to_string(),
645            };
646
647            let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
648                .expect_err("expected hash collision");
649            assert_eq!(err.class, ErrorClass::Corruption);
650            assert_eq!(err.origin, ErrorOrigin::Index);
651        });
652    }
653
654    #[test]
655    fn unique_collision_row_key_mismatch_is_corruption() {
656        reset_stores();
657
658        let indexed = TestEntity {
659            id: Ulid::from_u128(1),
660            tag: "alpha".to_string(),
661        };
662        let corrupted = TestEntity {
663            id: Ulid::from_u128(2),
664            tag: "alpha".to_string(),
665        };
666
667        let data_key = DataKey::new::<TestEntity>(indexed.id);
668        let raw_key = data_key.to_raw().expect("data key encode");
669        let raw_row = RawRow::try_new(serialize(&corrupted).unwrap()).unwrap();
670        TEST_DATA_STORE.with_borrow_mut(|store| store.insert(raw_key, raw_row));
671
672        let index_key = IndexKey::new(&indexed, &INDEX_MODEL)
673            .expect("index key")
674            .expect("index key missing");
675        let raw_index_key = index_key.to_raw();
676        let entry = IndexEntry::new(indexed.key());
677        let raw_entry = RawIndexEntry::try_from_entry(&entry).unwrap();
678        TEST_INDEX_STORE.with_borrow_mut(|store| store.insert(raw_index_key, raw_entry));
679
680        let incoming = TestEntity {
681            id: Ulid::from_u128(3),
682            tag: "alpha".to_string(),
683        };
684
685        let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
686            .expect_err("expected row key mismatch corruption");
687        assert_eq!(err.class, ErrorClass::Corruption);
688        assert_eq!(err.origin, ErrorOrigin::Index);
689    }
690}