1use crate::{
2 db::{
3 CommitIndexOp,
4 executor::ExecutorError,
5 index::{
6 IndexEntry, IndexEntryCorruption, IndexEntryEncodeError, IndexKey, IndexStore,
7 RawIndexEntry, RawIndexKey,
8 },
9 store::DataKey,
10 },
11 error::{ErrorClass, ErrorOrigin, InternalError},
12 key::Key,
13 model::index::IndexModel,
14 obs::sink::{self, MetricsEvent},
15 traits::{EntityKind, Storable},
16};
17use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
18
19#[derive(Debug)]
24pub struct IndexApplyPlan {
25 pub index: &'static IndexModel,
26 pub store: &'static LocalKey<RefCell<IndexStore>>,
27}
28
29#[derive(Debug)]
34pub struct IndexMutationPlan {
35 pub apply: Vec<IndexApplyPlan>,
36 pub commit_ops: Vec<CommitIndexOp>,
37}
38
39pub fn plan_index_mutation_for_entity<E: EntityKind>(
49 db: &crate::db::Db<E::Canister>,
50 old: Option<&E>,
51 new: Option<&E>,
52) -> Result<IndexMutationPlan, InternalError> {
53 let old_entity_key = old.map(EntityKind::key);
54 let new_entity_key = new.map(EntityKind::key);
55
56 let mut apply = Vec::with_capacity(E::INDEXES.len());
57 let mut commit_ops = Vec::new();
58
59 for index in E::INDEXES {
60 let store = db.with_index(|reg| reg.try_get_store(index.store))?;
61
62 let old_key = match old {
63 Some(entity) => IndexKey::new(entity, index)?,
64 None => None,
65 };
66 let new_key = match new {
67 Some(entity) => IndexKey::new(entity, index)?,
68 None => None,
69 };
70
71 let old_entry = load_existing_entry(store, index, old)?;
72 if let Some(old_key) = &old_key {
74 let Some(old_entity_key) = old_entity_key else {
75 return Err(InternalError::new(
76 ErrorClass::Internal,
77 ErrorOrigin::Index,
78 "missing old entity key for index removal".to_string(),
79 ));
80 };
81 let entry = old_entry.as_ref().ok_or_else(|| {
82 ExecutorError::corruption(
83 ErrorOrigin::Index,
84 format!(
85 "index corrupted: {} ({}) -> {}",
86 E::PATH,
87 index.fields.join(", "),
88 IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
89 ),
90 )
91 })?;
92 if index.unique && entry.len() > 1 {
93 return Err(ExecutorError::corruption(
94 ErrorOrigin::Index,
95 format!(
96 "index corrupted: {} ({}) -> {}",
97 E::PATH,
98 index.fields.join(", "),
99 IndexEntryCorruption::NonUniqueEntry { keys: entry.len() }
100 ),
101 )
102 .into());
103 }
104 if !entry.contains(&old_entity_key) {
105 return Err(ExecutorError::corruption(
106 ErrorOrigin::Index,
107 format!(
108 "index corrupted: {} ({}) -> {}",
109 E::PATH,
110 index.fields.join(", "),
111 IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
112 ),
113 )
114 .into());
115 }
116 }
117 let new_entry = if old_key == new_key {
118 old_entry.clone()
119 } else {
120 load_existing_entry(store, index, new)?
121 };
122
123 validate_unique_constraint::<E>(
124 db,
125 index,
126 new_entry.as_ref(),
127 new_entity_key.as_ref(),
128 new,
129 )?;
130
131 build_commit_ops_for_index::<E>(
132 &mut commit_ops,
133 index,
134 old_key,
135 new_key,
136 old_entry,
137 new_entry,
138 old_entity_key,
139 new_entity_key,
140 )?;
141
142 apply.push(IndexApplyPlan { index, store });
143 }
144
145 Ok(IndexMutationPlan { apply, commit_ops })
146}
147
148fn load_existing_entry<E: EntityKind>(
149 store: &'static LocalKey<RefCell<IndexStore>>,
150 index: &'static IndexModel,
151 entity: Option<&E>,
152) -> Result<Option<IndexEntry>, InternalError> {
153 let Some(entity) = entity else {
154 return Ok(None);
155 };
156 let Some(key) = IndexKey::new(entity, index)? else {
157 return Ok(None);
158 };
159
160 store
161 .with_borrow(|s| s.get(&key.to_raw()))
162 .map(|raw| {
163 raw.try_decode().map_err(|err| {
164 ExecutorError::corruption(
165 ErrorOrigin::Index,
166 format!(
167 "index corrupted: {} ({}) -> {}",
168 E::PATH,
169 index.fields.join(", "),
170 err
171 ),
172 )
173 .into()
174 })
175 })
176 .transpose()
177}
178
179#[expect(clippy::too_many_lines)]
185fn validate_unique_constraint<E: EntityKind>(
186 db: &crate::db::Db<E::Canister>,
187 index: &IndexModel,
188 entry: Option<&IndexEntry>,
189 new_key: Option<&Key>,
190 new_entity: Option<&E>,
191) -> Result<(), InternalError> {
192 if !index.unique {
193 return Ok(());
194 }
195
196 let Some(entry) = entry else {
197 return Ok(());
198 };
199
200 if entry.len() > 1 {
201 return Err(ExecutorError::corruption(
202 ErrorOrigin::Index,
203 format!(
204 "index corrupted: {} ({}) -> {} keys",
205 E::PATH,
206 index.fields.join(", "),
207 entry.len()
208 ),
209 )
210 .into());
211 }
212
213 let Some(new_key) = new_key else {
214 return Ok(());
215 };
216 if entry.contains(new_key) {
217 return Ok(());
218 }
219
220 let Some(new_entity) = new_entity else {
221 return Err(InternalError::new(
222 ErrorClass::InvariantViolation,
223 ErrorOrigin::Index,
224 "missing entity payload during unique validation".to_string(),
225 ));
226 };
227 let existing_key = entry.single_key().ok_or_else(|| {
228 ExecutorError::corruption(
229 ErrorOrigin::Index,
230 format!(
231 "index corrupted: {} ({}) -> {} keys",
232 E::PATH,
233 index.fields.join(", "),
234 entry.len()
235 ),
236 )
237 })?;
238
239 let stored = {
240 let data_key = DataKey::new::<E>(existing_key);
241 let row = db.context::<E>().read_strict(&data_key)?;
242 row.try_decode::<E>().map_err(|err| {
243 ExecutorError::corruption(
244 ErrorOrigin::Serialize,
245 format!("failed to deserialize row: {data_key} ({err})"),
246 )
247 })?
248 };
249 let stored_key = stored.key();
250 if stored_key != existing_key {
251 return Err(ExecutorError::corruption(
253 ErrorOrigin::Store,
254 format!(
255 "index corrupted: {} ({}) -> {}",
256 E::PATH,
257 index.fields.join(", "),
258 IndexEntryCorruption::RowKeyMismatch {
259 indexed_key: Box::new(existing_key),
260 row_key: Box::new(stored_key),
261 }
262 ),
263 )
264 .into());
265 }
266
267 for field in index.fields {
268 let expected = new_entity.get_value(field).ok_or_else(|| {
269 InternalError::new(
270 ErrorClass::InvariantViolation,
271 ErrorOrigin::Index,
272 format!(
273 "index field missing on lookup entity: {} ({})",
274 E::PATH,
275 field
276 ),
277 )
278 })?;
279 let actual = stored.get_value(field).ok_or_else(|| {
280 ExecutorError::corruption(
281 ErrorOrigin::Index,
282 format!(
283 "index corrupted: {} ({}) -> stored entity missing field",
284 E::PATH,
285 field
286 ),
287 )
288 })?;
289
290 if expected != actual {
291 return Err(ExecutorError::corruption(
292 ErrorOrigin::Index,
293 format!("index hash collision: {} ({})", E::PATH, field),
294 )
295 .into());
296 }
297 }
298
299 sink::record(MetricsEvent::UniqueViolation {
300 entity_path: E::PATH,
301 });
302
303 Err(ExecutorError::index_violation(E::PATH, index.fields).into())
304}
305
306#[allow(clippy::too_many_arguments)]
315fn build_commit_ops_for_index<E: EntityKind>(
316 commit_ops: &mut Vec<CommitIndexOp>,
317 index: &'static IndexModel,
318 old_key: Option<IndexKey>,
319 new_key: Option<IndexKey>,
320 old_entry: Option<IndexEntry>,
321 new_entry: Option<IndexEntry>,
322 old_entity_key: Option<Key>,
323 new_entity_key: Option<Key>,
324) -> Result<(), InternalError> {
325 let mut touched: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
326 let fields = index.fields.join(", ");
327
328 if let Some(old_key) = old_key {
331 let Some(old_entity_key) = old_entity_key else {
332 return Err(InternalError::new(
333 ErrorClass::Internal,
334 ErrorOrigin::Index,
335 "missing old entity key for index removal".to_string(),
336 ));
337 };
338
339 if let Some(mut entry) = old_entry {
340 entry.remove_key(&old_entity_key);
341 let after = if entry.is_empty() { None } else { Some(entry) };
342 touched.insert(old_key.to_raw(), after);
343 } else {
344 touched.insert(old_key.to_raw(), None);
346 }
347 }
348
349 if let Some(new_key) = new_key {
352 let Some(new_entity_key) = new_entity_key else {
353 return Err(InternalError::new(
354 ErrorClass::Internal,
355 ErrorOrigin::Index,
356 "missing new entity key for index insertion".to_string(),
357 ));
358 };
359
360 let raw_key = new_key.to_raw();
361
362 let mut entry = if let Some(existing) = touched.remove(&raw_key) {
367 existing.unwrap_or_else(|| IndexEntry::new(new_entity_key))
368 } else if let Some(existing) = new_entry {
369 existing
370 } else {
371 IndexEntry::new(new_entity_key)
372 };
373
374 entry.insert_key(new_entity_key);
375 touched.insert(raw_key, Some(entry));
376 }
377
378 for (raw_key, entry) in touched {
381 let value = if let Some(entry) = entry {
382 let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| match err {
383 IndexEntryEncodeError::TooManyKeys { keys } => InternalError::new(
384 ErrorClass::Unsupported,
385 ErrorOrigin::Index,
386 format!(
387 "index entry exceeds max keys: {} ({}) -> {} keys",
388 E::PATH,
389 fields,
390 keys
391 ),
392 ),
393 IndexEntryEncodeError::KeyEncoding(err) => InternalError::new(
394 ErrorClass::Unsupported,
395 ErrorOrigin::Index,
396 format!(
397 "index entry key encoding failed: {} ({}) -> {err}",
398 E::PATH,
399 fields
400 ),
401 ),
402 })?;
403 Some(raw.into_bytes())
404 } else {
405 None
406 };
407
408 commit_ops.push(CommitIndexOp {
409 store: index.store.to_string(),
410 key: raw_key.as_bytes().to_vec(),
411 value,
412 });
413 }
414
415 Ok(())
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use crate::{
422 db::{
423 Db,
424 index::IndexStoreRegistry,
425 index::fingerprint::with_test_hash_override,
426 store::{DataStore, DataStoreRegistry, RawRow},
427 },
428 error::{ErrorClass, ErrorOrigin},
429 model::{
430 entity::EntityModel,
431 field::{EntityFieldKind, EntityFieldModel},
432 index::IndexModel,
433 },
434 serialize::serialize,
435 traits::{
436 CanisterKind, DataStoreKind, EntityKind, FieldValues, Path, SanitizeAuto,
437 SanitizeCustom, ValidateAuto, ValidateCustom, View, Visitable,
438 },
439 types::Ulid,
440 value::Value,
441 };
442 use serde::{Deserialize, Serialize};
443 use std::cell::RefCell;
444
445 const CANISTER_PATH: &str = "index_plan_test::TestCanister";
446 const DATA_STORE_PATH: &str = "index_plan_test::TestDataStore";
447 const INDEX_STORE_PATH: &str = "index_plan_test::TestIndexStore";
448 const ENTITY_PATH: &str = "index_plan_test::TestEntity";
449
450 const INDEX_FIELDS: [&str; 1] = ["tag"];
451 const INDEX_MODEL: IndexModel = IndexModel::new(
452 "index_plan_test::idx_tag",
453 INDEX_STORE_PATH,
454 &INDEX_FIELDS,
455 true,
456 );
457 const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
458
459 const TEST_FIELDS: [EntityFieldModel; 2] = [
460 EntityFieldModel {
461 name: "id",
462 kind: EntityFieldKind::Ulid,
463 },
464 EntityFieldModel {
465 name: "tag",
466 kind: EntityFieldKind::Text,
467 },
468 ];
469 const TEST_MODEL: EntityModel = EntityModel {
470 path: ENTITY_PATH,
471 entity_name: "TestEntity",
472 primary_key: &TEST_FIELDS[0],
473 fields: &TEST_FIELDS,
474 indexes: &INDEXES,
475 };
476 const MISSING_ENTITY_PATH: &str = "index_plan_test::MissingFieldEntity";
477 const MISSING_MODEL: EntityModel = EntityModel {
478 path: MISSING_ENTITY_PATH,
479 entity_name: "MissingFieldEntity",
480 primary_key: &TEST_FIELDS[0],
481 fields: &TEST_FIELDS,
482 indexes: &INDEXES,
483 };
484
485 #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
486 struct TestEntity {
487 id: Ulid,
488 tag: String,
489 }
490
491 impl Path for TestEntity {
492 const PATH: &'static str = ENTITY_PATH;
493 }
494
495 impl View for TestEntity {
496 type ViewType = Self;
497
498 fn to_view(&self) -> Self::ViewType {
499 self.clone()
500 }
501
502 fn from_view(view: Self::ViewType) -> Self {
503 view
504 }
505 }
506
507 impl SanitizeAuto for TestEntity {}
508 impl SanitizeCustom for TestEntity {}
509 impl ValidateAuto for TestEntity {}
510 impl ValidateCustom for TestEntity {}
511 impl Visitable for TestEntity {}
512
513 impl FieldValues for TestEntity {
514 fn get_value(&self, field: &str) -> Option<Value> {
515 match field {
516 "id" => Some(Value::Ulid(self.id)),
517 "tag" => Some(Value::Text(self.tag.clone())),
518 _ => None,
519 }
520 }
521 }
522
523 #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
524 struct MissingFieldEntity {
525 id: Ulid,
526 tag: String,
527 }
528
529 impl Path for MissingFieldEntity {
530 const PATH: &'static str = MISSING_ENTITY_PATH;
531 }
532
533 impl View for MissingFieldEntity {
534 type ViewType = Self;
535
536 fn to_view(&self) -> Self::ViewType {
537 self.clone()
538 }
539
540 fn from_view(view: Self::ViewType) -> Self {
541 view
542 }
543 }
544
545 impl SanitizeAuto for MissingFieldEntity {}
546 impl SanitizeCustom for MissingFieldEntity {}
547 impl ValidateAuto for MissingFieldEntity {}
548 impl ValidateCustom for MissingFieldEntity {}
549 impl Visitable for MissingFieldEntity {}
550
551 impl FieldValues for MissingFieldEntity {
552 fn get_value(&self, field: &str) -> Option<Value> {
553 match field {
554 "id" => Some(Value::Ulid(self.id)),
555 "tag" if self.tag == "__missing__" => None,
556 "tag" => Some(Value::Text(self.tag.clone())),
557 _ => None,
558 }
559 }
560 }
561
562 #[derive(Clone, Copy)]
563 struct TestCanister;
564
565 impl Path for TestCanister {
566 const PATH: &'static str = CANISTER_PATH;
567 }
568
569 impl CanisterKind for TestCanister {}
570
571 struct TestStore;
572
573 impl Path for TestStore {
574 const PATH: &'static str = DATA_STORE_PATH;
575 }
576
577 impl DataStoreKind for TestStore {
578 type Canister = TestCanister;
579 }
580
581 impl EntityKind for TestEntity {
582 type PrimaryKey = Ulid;
583 type DataStore = TestStore;
584 type Canister = TestCanister;
585
586 const ENTITY_NAME: &'static str = "TestEntity";
587 const PRIMARY_KEY: &'static str = "id";
588 const FIELDS: &'static [&'static str] = &["id", "tag"];
589 const INDEXES: &'static [&'static IndexModel] = &INDEXES;
590 const MODEL: &'static EntityModel = &TEST_MODEL;
591
592 fn key(&self) -> crate::key::Key {
593 self.id.into()
594 }
595
596 fn primary_key(&self) -> Self::PrimaryKey {
597 self.id
598 }
599
600 fn set_primary_key(&mut self, key: Self::PrimaryKey) {
601 self.id = key;
602 }
603 }
604
605 impl EntityKind for MissingFieldEntity {
606 type PrimaryKey = Ulid;
607 type DataStore = TestStore;
608 type Canister = TestCanister;
609
610 const ENTITY_NAME: &'static str = "MissingFieldEntity";
611 const PRIMARY_KEY: &'static str = "id";
612 const FIELDS: &'static [&'static str] = &["id", "tag"];
613 const INDEXES: &'static [&'static IndexModel] = &INDEXES;
614 const MODEL: &'static EntityModel = &MISSING_MODEL;
615
616 fn key(&self) -> crate::key::Key {
617 self.id.into()
618 }
619
620 fn primary_key(&self) -> Self::PrimaryKey {
621 self.id
622 }
623
624 fn set_primary_key(&mut self, key: Self::PrimaryKey) {
625 self.id = key;
626 }
627 }
628
629 canic_memory::eager_static! {
630 static TEST_DATA_STORE: RefCell<DataStore> =
631 RefCell::new(DataStore::init(canic_memory::ic_memory!(DataStore, 20)));
632 }
633
634 canic_memory::eager_static! {
635 static TEST_INDEX_STORE: RefCell<IndexStore> =
636 RefCell::new(IndexStore::init(
637 canic_memory::ic_memory!(IndexStore, 21),
638 canic_memory::ic_memory!(IndexStore, 22),
639 ));
640 }
641
642 thread_local! {
643 static DATA_REGISTRY: DataStoreRegistry = {
644 let mut reg = DataStoreRegistry::new();
645 reg.register(DATA_STORE_PATH, &TEST_DATA_STORE);
646 reg
647 };
648
649 static INDEX_REGISTRY: IndexStoreRegistry = {
650 let mut reg = IndexStoreRegistry::new();
651 reg.register(INDEX_STORE_PATH, &TEST_INDEX_STORE);
652 reg
653 };
654 }
655
656 static DB: Db<TestCanister> = Db::new(&DATA_REGISTRY, &INDEX_REGISTRY);
657
658 canic_memory::eager_init!({
659 canic_memory::ic_memory_range!(0, 60);
660 });
661
662 fn reset_stores() {
663 TEST_DATA_STORE.with_borrow_mut(DataStore::clear);
664 TEST_INDEX_STORE.with_borrow_mut(IndexStore::clear);
665 }
666
667 fn seed_entity(entity: &TestEntity) {
668 let data_key = DataKey::new::<TestEntity>(entity.id);
669 let raw_key = data_key.to_raw().expect("data key encode");
670 let raw_row = RawRow::try_new(serialize(entity).unwrap()).unwrap();
671 TEST_DATA_STORE.with_borrow_mut(|store| store.insert(raw_key, raw_row));
672
673 let index_key = IndexKey::new(entity, &INDEX_MODEL)
674 .expect("index key")
675 .expect("index key missing");
676 let raw_index_key = index_key.to_raw();
677 let entry = IndexEntry::new(entity.key());
678 let raw_entry = RawIndexEntry::try_from_entry(&entry).unwrap();
679 TEST_INDEX_STORE.with_borrow_mut(|store| store.insert(raw_index_key, raw_entry));
680 }
681
682 #[test]
683 fn unique_collision_equal_values_is_conflict() {
684 with_test_hash_override([0xAA; 16], || {
685 reset_stores();
686
687 let existing = TestEntity {
688 id: Ulid::from_u128(1),
689 tag: "alpha".to_string(),
690 };
691 seed_entity(&existing);
692
693 let incoming = TestEntity {
694 id: Ulid::from_u128(2),
695 tag: "alpha".to_string(),
696 };
697
698 let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
699 .expect_err("expected unique conflict");
700 assert_eq!(err.class, ErrorClass::Conflict);
701 });
702 }
703
704 #[test]
705 fn unique_collision_different_values_is_corruption() {
706 with_test_hash_override([0xAA; 16], || {
707 reset_stores();
708
709 let existing = TestEntity {
710 id: Ulid::from_u128(1),
711 tag: "alpha".to_string(),
712 };
713 seed_entity(&existing);
714
715 let incoming = TestEntity {
716 id: Ulid::from_u128(2),
717 tag: "beta".to_string(),
718 };
719
720 let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
721 .expect_err("expected hash collision");
722 assert_eq!(err.class, ErrorClass::Corruption);
723 assert_eq!(err.origin, ErrorOrigin::Index);
724 });
725 }
726
727 #[test]
728 fn unique_collision_row_key_mismatch_is_corruption() {
729 reset_stores();
730
731 let indexed = TestEntity {
732 id: Ulid::from_u128(1),
733 tag: "alpha".to_string(),
734 };
735 let corrupted = TestEntity {
736 id: Ulid::from_u128(2),
737 tag: "alpha".to_string(),
738 };
739
740 let data_key = DataKey::new::<TestEntity>(indexed.id);
741 let raw_key = data_key.to_raw().expect("data key encode");
742 let raw_row = RawRow::try_new(serialize(&corrupted).unwrap()).unwrap();
743 TEST_DATA_STORE.with_borrow_mut(|store| store.insert(raw_key, raw_row));
744
745 let index_key = IndexKey::new(&indexed, &INDEX_MODEL)
746 .expect("index key")
747 .expect("index key missing");
748 let raw_index_key = index_key.to_raw();
749 let entry = IndexEntry::new(indexed.key());
750 let raw_entry = RawIndexEntry::try_from_entry(&entry).unwrap();
751 TEST_INDEX_STORE.with_borrow_mut(|store| store.insert(raw_index_key, raw_entry));
752
753 let incoming = TestEntity {
754 id: Ulid::from_u128(3),
755 tag: "alpha".to_string(),
756 };
757
758 let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
759 .expect_err("expected row key mismatch corruption");
760 assert_eq!(err.class, ErrorClass::Corruption);
761 assert_eq!(err.origin, ErrorOrigin::Store);
762 }
763
764 #[test]
765 fn unique_collision_stored_missing_field_is_corruption() {
766 reset_stores();
767
768 let stored = MissingFieldEntity {
769 id: Ulid::from_u128(1),
770 tag: "__missing__".to_string(),
771 };
772 let data_key = DataKey::new::<MissingFieldEntity>(stored.id);
773 let raw_key = data_key.to_raw().expect("data key encode");
774 let raw_row = RawRow::try_new(serialize(&stored).unwrap()).unwrap();
775 TEST_DATA_STORE.with_borrow_mut(|store| store.insert(raw_key, raw_row));
776
777 let incoming = MissingFieldEntity {
778 id: Ulid::from_u128(2),
779 tag: "alpha".to_string(),
780 };
781
782 let index_key = IndexKey::new(&incoming, &INDEX_MODEL)
783 .expect("index key")
784 .expect("index key missing");
785 let raw_index_key = index_key.to_raw();
786 let entry = IndexEntry::new(stored.key());
787 let raw_entry = RawIndexEntry::try_from_entry(&entry).unwrap();
788 TEST_INDEX_STORE.with_borrow_mut(|store| store.insert(raw_index_key, raw_entry));
789
790 let err = plan_index_mutation_for_entity::<MissingFieldEntity>(&DB, None, Some(&incoming))
791 .expect_err("expected missing stored field corruption");
792 assert_eq!(err.class, ErrorClass::Corruption);
793 assert_eq!(err.origin, ErrorOrigin::Index);
794 }
795}