1use crate::{
2 db::{
3 CommitIndexOp,
4 executor::ExecutorError,
5 index::{
6 IndexEntry, IndexEntryCorruption, IndexEntryEncodeError, IndexKey, IndexStore,
7 RawIndexEntry, RawIndexKey,
8 },
9 store::DataKey,
10 },
11 error::{ErrorClass, ErrorOrigin, InternalError},
12 key::Key,
13 model::index::IndexModel,
14 obs::sink::{self, MetricsEvent},
15 traits::{EntityKind, Storable},
16};
17use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
18
19#[derive(Debug)]
24pub struct IndexApplyPlan {
25 pub index: &'static IndexModel,
26 pub store: &'static LocalKey<RefCell<IndexStore>>,
27}
28
29#[derive(Debug)]
34pub struct IndexMutationPlan {
35 pub apply: Vec<IndexApplyPlan>,
36 pub commit_ops: Vec<CommitIndexOp>,
37}
38
39pub fn plan_index_mutation_for_entity<E: EntityKind>(
49 db: &crate::db::Db<E::Canister>,
50 old: Option<&E>,
51 new: Option<&E>,
52) -> Result<IndexMutationPlan, InternalError> {
53 let old_entity_key = old.map(EntityKind::key);
54 let new_entity_key = new.map(EntityKind::key);
55
56 let mut apply = Vec::with_capacity(E::INDEXES.len());
57 let mut commit_ops = Vec::new();
58
59 for index in E::INDEXES {
60 let store = db.with_index(|reg| reg.try_get_store(index.store))?;
61
62 let old_key = match old {
63 Some(entity) => IndexKey::new(entity, index)?,
64 None => None,
65 };
66 let new_key = match new {
67 Some(entity) => IndexKey::new(entity, index)?,
68 None => None,
69 };
70
71 let old_entry = load_existing_entry(store, index, old)?;
72 if let Some(old_key) = &old_key {
74 let Some(old_entity_key) = old_entity_key else {
75 return Err(InternalError::new(
76 ErrorClass::Internal,
77 ErrorOrigin::Index,
78 "missing old entity key for index removal".to_string(),
79 ));
80 };
81 let entry = old_entry.as_ref().ok_or_else(|| {
82 ExecutorError::corruption(
83 ErrorOrigin::Index,
84 format!(
85 "index corrupted: {} ({}) -> {}",
86 E::PATH,
87 index.fields.join(", "),
88 IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
89 ),
90 )
91 })?;
92 if index.unique && entry.len() > 1 {
93 return Err(ExecutorError::corruption(
94 ErrorOrigin::Index,
95 format!(
96 "index corrupted: {} ({}) -> {}",
97 E::PATH,
98 index.fields.join(", "),
99 IndexEntryCorruption::NonUniqueEntry { keys: entry.len() }
100 ),
101 )
102 .into());
103 }
104 if !entry.contains(&old_entity_key) {
105 return Err(ExecutorError::corruption(
106 ErrorOrigin::Index,
107 format!(
108 "index corrupted: {} ({}) -> {}",
109 E::PATH,
110 index.fields.join(", "),
111 IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
112 ),
113 )
114 .into());
115 }
116 }
117 let new_entry = if old_key == new_key {
118 old_entry.clone()
119 } else {
120 load_existing_entry(store, index, new)?
121 };
122
123 validate_unique_constraint::<E>(
124 db,
125 index,
126 new_entry.as_ref(),
127 new_entity_key.as_ref(),
128 new,
129 )?;
130
131 build_commit_ops_for_index::<E>(
132 &mut commit_ops,
133 index,
134 old_key,
135 new_key,
136 old_entry,
137 new_entry,
138 old_entity_key,
139 new_entity_key,
140 )?;
141
142 apply.push(IndexApplyPlan { index, store });
143 }
144
145 Ok(IndexMutationPlan { apply, commit_ops })
146}
147
148fn load_existing_entry<E: EntityKind>(
149 store: &'static LocalKey<RefCell<IndexStore>>,
150 index: &'static IndexModel,
151 entity: Option<&E>,
152) -> Result<Option<IndexEntry>, InternalError> {
153 let Some(entity) = entity else {
154 return Ok(None);
155 };
156 let Some(key) = IndexKey::new(entity, index)? else {
157 return Ok(None);
158 };
159
160 store
161 .with_borrow(|s| s.get(&key.to_raw()))
162 .map(|raw| {
163 raw.try_decode().map_err(|err| {
164 ExecutorError::corruption(
165 ErrorOrigin::Index,
166 format!(
167 "index corrupted: {} ({}) -> {}",
168 E::PATH,
169 index.fields.join(", "),
170 err
171 ),
172 )
173 .into()
174 })
175 })
176 .transpose()
177}
178
179#[expect(clippy::too_many_lines)]
185fn validate_unique_constraint<E: EntityKind>(
186 db: &crate::db::Db<E::Canister>,
187 index: &IndexModel,
188 entry: Option<&IndexEntry>,
189 new_key: Option<&Key>,
190 new_entity: Option<&E>,
191) -> Result<(), InternalError> {
192 if !index.unique {
193 return Ok(());
194 }
195
196 let Some(entry) = entry else {
197 return Ok(());
198 };
199
200 if entry.len() > 1 {
201 return Err(ExecutorError::corruption(
202 ErrorOrigin::Index,
203 format!(
204 "index corrupted: {} ({}) -> {} keys",
205 E::PATH,
206 index.fields.join(", "),
207 entry.len()
208 ),
209 )
210 .into());
211 }
212
213 let Some(new_key) = new_key else {
214 return Ok(());
215 };
216 if entry.contains(new_key) {
217 return Ok(());
218 }
219
220 let Some(new_entity) = new_entity else {
221 return Err(InternalError::new(
222 ErrorClass::InvariantViolation,
223 ErrorOrigin::Index,
224 "missing entity payload during unique validation".to_string(),
225 ));
226 };
227 let existing_key = entry.single_key().ok_or_else(|| {
228 ExecutorError::corruption(
229 ErrorOrigin::Index,
230 format!(
231 "index corrupted: {} ({}) -> {} keys",
232 E::PATH,
233 index.fields.join(", "),
234 entry.len()
235 ),
236 )
237 })?;
238
239 let stored = {
240 let data_key = DataKey::new::<E>(existing_key);
241 let row = db.context::<E>().read_strict(&data_key)?;
242 row.try_decode::<E>().map_err(|err| {
243 ExecutorError::corruption(
244 ErrorOrigin::Serialize,
245 format!("failed to deserialize row: {data_key} ({err})"),
246 )
247 })?
248 };
249 let stored_key = stored.key();
250 if stored_key != existing_key {
251 return Err(ExecutorError::corruption(
253 ErrorOrigin::Index,
254 format!(
255 "index corrupted: {} ({}) -> {}",
256 E::PATH,
257 index.fields.join(", "),
258 IndexEntryCorruption::RowKeyMismatch {
259 indexed_key: Box::new(existing_key),
260 row_key: Box::new(stored_key),
261 }
262 ),
263 )
264 .into());
265 }
266
267 for field in index.fields {
268 let expected = new_entity.get_value(field).ok_or_else(|| {
269 InternalError::new(
270 ErrorClass::InvariantViolation,
271 ErrorOrigin::Index,
272 format!(
273 "index field missing on lookup entity: {} ({})",
274 E::PATH,
275 field
276 ),
277 )
278 })?;
279 let actual = stored.get_value(field).ok_or_else(|| {
280 InternalError::new(
281 ErrorClass::InvariantViolation,
282 ErrorOrigin::Index,
283 format!(
284 "index field missing on stored entity: {} ({})",
285 E::PATH,
286 field
287 ),
288 )
289 })?;
290
291 if expected != actual {
292 return Err(ExecutorError::corruption(
293 ErrorOrigin::Index,
294 format!("index hash collision: {} ({})", E::PATH, field),
295 )
296 .into());
297 }
298 }
299
300 sink::record(MetricsEvent::UniqueViolation {
301 entity_path: E::PATH,
302 });
303
304 Err(ExecutorError::index_violation(E::PATH, index.fields).into())
305}
306
307#[allow(clippy::too_many_arguments)]
316fn build_commit_ops_for_index<E: EntityKind>(
317 commit_ops: &mut Vec<CommitIndexOp>,
318 index: &'static IndexModel,
319 old_key: Option<IndexKey>,
320 new_key: Option<IndexKey>,
321 old_entry: Option<IndexEntry>,
322 new_entry: Option<IndexEntry>,
323 old_entity_key: Option<Key>,
324 new_entity_key: Option<Key>,
325) -> Result<(), InternalError> {
326 let mut touched: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
327 let fields = index.fields.join(", ");
328
329 if let Some(old_key) = old_key {
332 let Some(old_entity_key) = old_entity_key else {
333 return Err(InternalError::new(
334 ErrorClass::Internal,
335 ErrorOrigin::Index,
336 "missing old entity key for index removal".to_string(),
337 ));
338 };
339
340 if let Some(mut entry) = old_entry {
341 entry.remove_key(&old_entity_key);
342 let after = if entry.is_empty() { None } else { Some(entry) };
343 touched.insert(old_key.to_raw(), after);
344 } else {
345 touched.insert(old_key.to_raw(), None);
347 }
348 }
349
350 if let Some(new_key) = new_key {
353 let Some(new_entity_key) = new_entity_key else {
354 return Err(InternalError::new(
355 ErrorClass::Internal,
356 ErrorOrigin::Index,
357 "missing new entity key for index insertion".to_string(),
358 ));
359 };
360
361 let raw_key = new_key.to_raw();
362
363 let mut entry = if let Some(existing) = touched.remove(&raw_key) {
368 existing.unwrap_or_else(|| IndexEntry::new(new_entity_key))
369 } else if let Some(existing) = new_entry {
370 existing
371 } else {
372 IndexEntry::new(new_entity_key)
373 };
374
375 entry.insert_key(new_entity_key);
376 touched.insert(raw_key, Some(entry));
377 }
378
379 for (raw_key, entry) in touched {
382 let value = if let Some(entry) = entry {
383 let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| match err {
384 IndexEntryEncodeError::TooManyKeys { keys } => InternalError::new(
385 ErrorClass::Unsupported,
386 ErrorOrigin::Index,
387 format!(
388 "index entry exceeds max keys: {} ({}) -> {} keys",
389 E::PATH,
390 fields,
391 keys
392 ),
393 ),
394 IndexEntryEncodeError::KeyEncoding(err) => InternalError::new(
395 ErrorClass::Unsupported,
396 ErrorOrigin::Index,
397 format!(
398 "index entry key encoding failed: {} ({}) -> {err}",
399 E::PATH,
400 fields
401 ),
402 ),
403 })?;
404 Some(raw.into_bytes())
405 } else {
406 None
407 };
408
409 commit_ops.push(CommitIndexOp {
410 store: index.store.to_string(),
411 key: raw_key.as_bytes().to_vec(),
412 value,
413 });
414 }
415
416 Ok(())
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422 use crate::{
423 db::{
424 Db,
425 index::IndexStoreRegistry,
426 index::fingerprint::with_test_hash_override,
427 store::{DataStore, DataStoreRegistry, RawRow},
428 },
429 error::{ErrorClass, ErrorOrigin},
430 model::{
431 entity::EntityModel,
432 field::{EntityFieldKind, EntityFieldModel},
433 index::IndexModel,
434 },
435 serialize::serialize,
436 traits::{
437 CanisterKind, EntityKind, FieldValues, Path, SanitizeAuto, SanitizeCustom, StoreKind,
438 ValidateAuto, ValidateCustom, View, Visitable,
439 },
440 types::Ulid,
441 value::Value,
442 };
443 use serde::{Deserialize, Serialize};
444 use std::cell::RefCell;
445
446 const CANISTER_PATH: &str = "index_plan_test::TestCanister";
447 const DATA_STORE_PATH: &str = "index_plan_test::TestDataStore";
448 const INDEX_STORE_PATH: &str = "index_plan_test::TestIndexStore";
449 const ENTITY_PATH: &str = "index_plan_test::TestEntity";
450
451 const INDEX_FIELDS: [&str; 1] = ["tag"];
452 const INDEX_MODEL: IndexModel = IndexModel::new(
453 "index_plan_test::idx_tag",
454 INDEX_STORE_PATH,
455 &INDEX_FIELDS,
456 true,
457 );
458 const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
459
460 const TEST_FIELDS: [EntityFieldModel; 2] = [
461 EntityFieldModel {
462 name: "id",
463 kind: EntityFieldKind::Ulid,
464 },
465 EntityFieldModel {
466 name: "tag",
467 kind: EntityFieldKind::Text,
468 },
469 ];
470 const TEST_MODEL: EntityModel = EntityModel {
471 path: ENTITY_PATH,
472 entity_name: "TestEntity",
473 primary_key: &TEST_FIELDS[0],
474 fields: &TEST_FIELDS,
475 indexes: &INDEXES,
476 };
477
478 #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
479 struct TestEntity {
480 id: Ulid,
481 tag: String,
482 }
483
484 impl Path for TestEntity {
485 const PATH: &'static str = ENTITY_PATH;
486 }
487
488 impl View for TestEntity {
489 type ViewType = Self;
490
491 fn to_view(&self) -> Self::ViewType {
492 self.clone()
493 }
494
495 fn from_view(view: Self::ViewType) -> Self {
496 view
497 }
498 }
499
500 impl SanitizeAuto for TestEntity {}
501 impl SanitizeCustom for TestEntity {}
502 impl ValidateAuto for TestEntity {}
503 impl ValidateCustom for TestEntity {}
504 impl Visitable for TestEntity {}
505
506 impl FieldValues for TestEntity {
507 fn get_value(&self, field: &str) -> Option<Value> {
508 match field {
509 "id" => Some(Value::Ulid(self.id)),
510 "tag" => Some(Value::Text(self.tag.clone())),
511 _ => None,
512 }
513 }
514 }
515
516 #[derive(Clone, Copy)]
517 struct TestCanister;
518
519 impl Path for TestCanister {
520 const PATH: &'static str = CANISTER_PATH;
521 }
522
523 impl CanisterKind for TestCanister {}
524
525 struct TestStore;
526
527 impl Path for TestStore {
528 const PATH: &'static str = DATA_STORE_PATH;
529 }
530
531 impl StoreKind for TestStore {
532 type Canister = TestCanister;
533 }
534
535 impl EntityKind for TestEntity {
536 type PrimaryKey = Ulid;
537 type Store = TestStore;
538 type Canister = TestCanister;
539
540 const ENTITY_NAME: &'static str = "TestEntity";
541 const PRIMARY_KEY: &'static str = "id";
542 const FIELDS: &'static [&'static str] = &["id", "tag"];
543 const INDEXES: &'static [&'static IndexModel] = &INDEXES;
544 const MODEL: &'static EntityModel = &TEST_MODEL;
545
546 fn key(&self) -> crate::key::Key {
547 self.id.into()
548 }
549
550 fn primary_key(&self) -> Self::PrimaryKey {
551 self.id
552 }
553
554 fn set_primary_key(&mut self, key: Self::PrimaryKey) {
555 self.id = key;
556 }
557 }
558
559 canic_memory::eager_static! {
560 static TEST_DATA_STORE: RefCell<DataStore> =
561 RefCell::new(DataStore::init(canic_memory::ic_memory!(DataStore, 20)));
562 }
563
564 canic_memory::eager_static! {
565 static TEST_INDEX_STORE: RefCell<IndexStore> =
566 RefCell::new(IndexStore::init(canic_memory::ic_memory!(IndexStore, 21)));
567 }
568
569 thread_local! {
570 static DATA_REGISTRY: DataStoreRegistry = {
571 let mut reg = DataStoreRegistry::new();
572 reg.register(DATA_STORE_PATH, &TEST_DATA_STORE);
573 reg
574 };
575
576 static INDEX_REGISTRY: IndexStoreRegistry = {
577 let mut reg = IndexStoreRegistry::new();
578 reg.register(INDEX_STORE_PATH, &TEST_INDEX_STORE);
579 reg
580 };
581 }
582
583 static DB: Db<TestCanister> = Db::new(&DATA_REGISTRY, &INDEX_REGISTRY);
584
585 canic_memory::eager_init!({
586 canic_memory::ic_memory_range!(0, 60);
587 });
588
589 fn reset_stores() {
590 TEST_DATA_STORE.with_borrow_mut(|store| store.clear());
591 TEST_INDEX_STORE.with_borrow_mut(|store| store.clear());
592 }
593
594 fn seed_entity(entity: &TestEntity) {
595 let data_key = DataKey::new::<TestEntity>(entity.id);
596 let raw_key = data_key.to_raw().expect("data key encode");
597 let raw_row = RawRow::try_new(serialize(entity).unwrap()).unwrap();
598 TEST_DATA_STORE.with_borrow_mut(|store| store.insert(raw_key, raw_row));
599
600 let index_key = IndexKey::new(entity, &INDEX_MODEL)
601 .expect("index key")
602 .expect("index key missing");
603 let raw_index_key = index_key.to_raw();
604 let entry = IndexEntry::new(entity.key());
605 let raw_entry = RawIndexEntry::try_from_entry(&entry).unwrap();
606 TEST_INDEX_STORE.with_borrow_mut(|store| store.insert(raw_index_key, raw_entry));
607 }
608
609 #[test]
610 fn unique_collision_equal_values_is_conflict() {
611 with_test_hash_override([0xAA; 16], || {
612 reset_stores();
613
614 let existing = TestEntity {
615 id: Ulid::from_u128(1),
616 tag: "alpha".to_string(),
617 };
618 seed_entity(&existing);
619
620 let incoming = TestEntity {
621 id: Ulid::from_u128(2),
622 tag: "alpha".to_string(),
623 };
624
625 let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
626 .expect_err("expected unique conflict");
627 assert_eq!(err.class, ErrorClass::Conflict);
628 });
629 }
630
631 #[test]
632 fn unique_collision_different_values_is_corruption() {
633 with_test_hash_override([0xAA; 16], || {
634 reset_stores();
635
636 let existing = TestEntity {
637 id: Ulid::from_u128(1),
638 tag: "alpha".to_string(),
639 };
640 seed_entity(&existing);
641
642 let incoming = TestEntity {
643 id: Ulid::from_u128(2),
644 tag: "beta".to_string(),
645 };
646
647 let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
648 .expect_err("expected hash collision");
649 assert_eq!(err.class, ErrorClass::Corruption);
650 assert_eq!(err.origin, ErrorOrigin::Index);
651 });
652 }
653
654 #[test]
655 fn unique_collision_row_key_mismatch_is_corruption() {
656 reset_stores();
657
658 let indexed = TestEntity {
659 id: Ulid::from_u128(1),
660 tag: "alpha".to_string(),
661 };
662 let corrupted = TestEntity {
663 id: Ulid::from_u128(2),
664 tag: "alpha".to_string(),
665 };
666
667 let data_key = DataKey::new::<TestEntity>(indexed.id);
668 let raw_key = data_key.to_raw().expect("data key encode");
669 let raw_row = RawRow::try_new(serialize(&corrupted).unwrap()).unwrap();
670 TEST_DATA_STORE.with_borrow_mut(|store| store.insert(raw_key, raw_row));
671
672 let index_key = IndexKey::new(&indexed, &INDEX_MODEL)
673 .expect("index key")
674 .expect("index key missing");
675 let raw_index_key = index_key.to_raw();
676 let entry = IndexEntry::new(indexed.key());
677 let raw_entry = RawIndexEntry::try_from_entry(&entry).unwrap();
678 TEST_INDEX_STORE.with_borrow_mut(|store| store.insert(raw_index_key, raw_entry));
679
680 let incoming = TestEntity {
681 id: Ulid::from_u128(3),
682 tag: "alpha".to_string(),
683 };
684
685 let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
686 .expect_err("expected row key mismatch corruption");
687 assert_eq!(err.class, ErrorClass::Corruption);
688 assert_eq!(err.origin, ErrorOrigin::Index);
689 }
690}