1use crate::{
2 db::{
3 CommitIndexOp,
4 executor::ExecutorError,
5 index::{
6 IndexEntry, IndexEntryCorruption, IndexEntryEncodeError, IndexKey, IndexStore,
7 RawIndexEntry, RawIndexKey,
8 },
9 store::DataKey,
10 },
11 error::{ErrorClass, ErrorOrigin, InternalError},
12 key::Key,
13 model::index::IndexModel,
14 obs::sink::{self, MetricsEvent},
15 traits::{EntityKind, Storable},
16};
17use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
18
19#[derive(Debug)]
24pub struct IndexApplyPlan {
25 pub index: &'static IndexModel,
26 pub store: &'static LocalKey<RefCell<IndexStore>>,
27}
28
29#[derive(Debug)]
34pub struct IndexMutationPlan {
35 pub apply: Vec<IndexApplyPlan>,
36 pub commit_ops: Vec<CommitIndexOp>,
37}
38
39pub fn plan_index_mutation_for_entity<E: EntityKind>(
49 db: &crate::db::Db<E::Canister>,
50 old: Option<&E>,
51 new: Option<&E>,
52) -> Result<IndexMutationPlan, InternalError> {
53 let old_entity_key = old.map(EntityKind::key);
54 let new_entity_key = new.map(EntityKind::key);
55
56 let mut apply = Vec::with_capacity(E::INDEXES.len());
57 let mut commit_ops = Vec::new();
58
59 for index in E::INDEXES {
60 let store = db.with_index(|reg| reg.try_get_store(index.store))?;
61
62 let old_key = match old {
63 Some(entity) => IndexKey::new(entity, index)?,
64 None => None,
65 };
66 let new_key = match new {
67 Some(entity) => IndexKey::new(entity, index)?,
68 None => None,
69 };
70
71 let old_entry = load_existing_entry(store, index, old)?;
72 if let Some(old_key) = &old_key {
74 let Some(old_entity_key) = old_entity_key else {
75 return Err(InternalError::new(
76 ErrorClass::Internal,
77 ErrorOrigin::Index,
78 "missing old entity key for index removal".to_string(),
79 ));
80 };
81 let entry = old_entry.as_ref().ok_or_else(|| {
82 ExecutorError::corruption(
83 ErrorOrigin::Index,
84 format!(
85 "index corrupted: {} ({}) -> {}",
86 E::PATH,
87 index.fields.join(", "),
88 IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
89 ),
90 )
91 })?;
92 if index.unique && entry.len() > 1 {
93 return Err(ExecutorError::corruption(
94 ErrorOrigin::Index,
95 format!(
96 "index corrupted: {} ({}) -> {}",
97 E::PATH,
98 index.fields.join(", "),
99 IndexEntryCorruption::NonUniqueEntry { keys: entry.len() }
100 ),
101 )
102 .into());
103 }
104 if !entry.contains(&old_entity_key) {
105 return Err(ExecutorError::corruption(
106 ErrorOrigin::Index,
107 format!(
108 "index corrupted: {} ({}) -> {}",
109 E::PATH,
110 index.fields.join(", "),
111 IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
112 ),
113 )
114 .into());
115 }
116 }
117 let new_entry = if old_key == new_key {
118 old_entry.clone()
119 } else {
120 load_existing_entry(store, index, new)?
121 };
122
123 validate_unique_constraint::<E>(
124 db,
125 index,
126 new_entry.as_ref(),
127 new_entity_key.as_ref(),
128 new,
129 )?;
130
131 build_commit_ops_for_index::<E>(
132 &mut commit_ops,
133 index,
134 old_key,
135 new_key,
136 old_entry,
137 new_entry,
138 old_entity_key,
139 new_entity_key,
140 )?;
141
142 apply.push(IndexApplyPlan { index, store });
143 }
144
145 Ok(IndexMutationPlan { apply, commit_ops })
146}
147
148fn load_existing_entry<E: EntityKind>(
149 store: &'static LocalKey<RefCell<IndexStore>>,
150 index: &'static IndexModel,
151 entity: Option<&E>,
152) -> Result<Option<IndexEntry>, InternalError> {
153 let Some(entity) = entity else {
154 return Ok(None);
155 };
156 let Some(key) = IndexKey::new(entity, index)? else {
157 return Ok(None);
158 };
159
160 store
161 .with_borrow(|s| s.get(&key.to_raw()))
162 .map(|raw| {
163 raw.try_decode().map_err(|err| {
164 ExecutorError::corruption(
165 ErrorOrigin::Index,
166 format!(
167 "index corrupted: {} ({}) -> {}",
168 E::PATH,
169 index.fields.join(", "),
170 err
171 ),
172 )
173 .into()
174 })
175 })
176 .transpose()
177}
178
179fn validate_unique_constraint<E: EntityKind>(
185 db: &crate::db::Db<E::Canister>,
186 index: &IndexModel,
187 entry: Option<&IndexEntry>,
188 new_key: Option<&Key>,
189 new_entity: Option<&E>,
190) -> Result<(), InternalError> {
191 if !index.unique {
192 return Ok(());
193 }
194
195 let Some(entry) = entry else {
196 return Ok(());
197 };
198
199 if entry.len() > 1 {
200 return Err(ExecutorError::corruption(
201 ErrorOrigin::Index,
202 format!(
203 "index corrupted: {} ({}) -> {} keys",
204 E::PATH,
205 index.fields.join(", "),
206 entry.len()
207 ),
208 )
209 .into());
210 }
211
212 let Some(new_key) = new_key else {
213 return Ok(());
214 };
215 if entry.contains(new_key) {
216 return Ok(());
217 }
218
219 let Some(new_entity) = new_entity else {
220 return Err(InternalError::new(
221 ErrorClass::InvariantViolation,
222 ErrorOrigin::Index,
223 "missing entity payload during unique validation".to_string(),
224 ));
225 };
226 let existing_key = entry.single_key().ok_or_else(|| {
227 ExecutorError::corruption(
228 ErrorOrigin::Index,
229 format!(
230 "index corrupted: {} ({}) -> {} keys",
231 E::PATH,
232 index.fields.join(", "),
233 entry.len()
234 ),
235 )
236 })?;
237
238 let stored = {
239 let data_key = DataKey::new::<E>(existing_key);
240 let row = db.context::<E>().read_strict(&data_key)?;
241 row.try_decode::<E>().map_err(|err| {
242 ExecutorError::corruption(
243 ErrorOrigin::Serialize,
244 format!("failed to deserialize row: {data_key} ({err})"),
245 )
246 })?
247 };
248
249 for field in index.fields {
250 let expected = new_entity.get_value(field).ok_or_else(|| {
251 InternalError::new(
252 ErrorClass::InvariantViolation,
253 ErrorOrigin::Index,
254 format!(
255 "index field missing on lookup entity: {} ({})",
256 E::PATH,
257 field
258 ),
259 )
260 })?;
261 let actual = stored.get_value(field).ok_or_else(|| {
262 InternalError::new(
263 ErrorClass::InvariantViolation,
264 ErrorOrigin::Index,
265 format!(
266 "index field missing on stored entity: {} ({})",
267 E::PATH,
268 field
269 ),
270 )
271 })?;
272
273 if expected != actual {
274 return Err(ExecutorError::corruption(
275 ErrorOrigin::Index,
276 format!("index hash collision: {} ({})", E::PATH, field),
277 )
278 .into());
279 }
280 }
281
282 sink::record(MetricsEvent::UniqueViolation {
283 entity_path: E::PATH,
284 });
285
286 Err(ExecutorError::index_violation(E::PATH, index.fields).into())
287}
288
289#[allow(clippy::too_many_arguments)]
298fn build_commit_ops_for_index<E: EntityKind>(
299 commit_ops: &mut Vec<CommitIndexOp>,
300 index: &'static IndexModel,
301 old_key: Option<IndexKey>,
302 new_key: Option<IndexKey>,
303 old_entry: Option<IndexEntry>,
304 new_entry: Option<IndexEntry>,
305 old_entity_key: Option<Key>,
306 new_entity_key: Option<Key>,
307) -> Result<(), InternalError> {
308 let mut touched: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
309 let fields = index.fields.join(", ");
310
311 if let Some(old_key) = old_key {
314 let Some(old_entity_key) = old_entity_key else {
315 return Err(InternalError::new(
316 ErrorClass::Internal,
317 ErrorOrigin::Index,
318 "missing old entity key for index removal".to_string(),
319 ));
320 };
321
322 if let Some(mut entry) = old_entry {
323 entry.remove_key(&old_entity_key);
324 let after = if entry.is_empty() { None } else { Some(entry) };
325 touched.insert(old_key.to_raw(), after);
326 } else {
327 touched.insert(old_key.to_raw(), None);
329 }
330 }
331
332 if let Some(new_key) = new_key {
335 let Some(new_entity_key) = new_entity_key else {
336 return Err(InternalError::new(
337 ErrorClass::Internal,
338 ErrorOrigin::Index,
339 "missing new entity key for index insertion".to_string(),
340 ));
341 };
342
343 let raw_key = new_key.to_raw();
344
345 let mut entry = if let Some(existing) = touched.remove(&raw_key) {
350 existing.unwrap_or_else(|| IndexEntry::new(new_entity_key))
351 } else if let Some(existing) = new_entry {
352 existing
353 } else {
354 IndexEntry::new(new_entity_key)
355 };
356
357 entry.insert_key(new_entity_key);
358 touched.insert(raw_key, Some(entry));
359 }
360
361 for (raw_key, entry) in touched {
364 let value = if let Some(entry) = entry {
365 let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| match err {
366 IndexEntryEncodeError::TooManyKeys { keys } => InternalError::new(
367 ErrorClass::Unsupported,
368 ErrorOrigin::Index,
369 format!(
370 "index entry exceeds max keys: {} ({}) -> {} keys",
371 E::PATH,
372 fields,
373 keys
374 ),
375 ),
376 IndexEntryEncodeError::KeyEncoding(err) => InternalError::new(
377 ErrorClass::Unsupported,
378 ErrorOrigin::Index,
379 format!(
380 "index entry key encoding failed: {} ({}) -> {err}",
381 E::PATH,
382 fields
383 ),
384 ),
385 })?;
386 Some(raw.into_bytes())
387 } else {
388 None
389 };
390
391 commit_ops.push(CommitIndexOp {
392 store: index.store.to_string(),
393 key: raw_key.as_bytes().to_vec(),
394 value,
395 });
396 }
397
398 Ok(())
399}
400
401pub fn load_existing_index_entry<E: EntityKind>(
425 store: &'static LocalKey<RefCell<IndexStore>>,
426 index: &'static IndexModel,
427 entity: Option<&E>,
428) -> Result<Option<IndexEntry>, InternalError> {
429 let Some(entity) = entity else {
430 return Ok(None);
431 };
432 let Some(key) = IndexKey::new(entity, index)? else {
433 return Ok(None);
434 };
435
436 store
437 .with_borrow(|s| s.get(&key.to_raw()))
438 .map(|raw| {
439 raw.try_decode().map_err(|err| {
440 ExecutorError::corruption(
441 ErrorOrigin::Index,
442 format!(
443 "index corrupted: {} ({}) -> {}",
444 E::PATH,
445 index.fields.join(", "),
446 err
447 ),
448 )
449 .into()
450 })
451 })
452 .transpose()
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458 use crate::{
459 db::{
460 Db,
461 index::IndexStoreRegistry,
462 index::fingerprint::with_test_hash_override,
463 store::{DataStore, DataStoreRegistry, RawRow},
464 },
465 error::{ErrorClass, ErrorOrigin},
466 model::{
467 entity::EntityModel,
468 field::{EntityFieldKind, EntityFieldModel},
469 index::IndexModel,
470 },
471 serialize::serialize,
472 traits::{
473 CanisterKind, EntityKind, FieldValues, Path, SanitizeAuto, SanitizeCustom, StoreKind,
474 ValidateAuto, ValidateCustom, View, ViewError, Visitable,
475 },
476 types::Ulid,
477 value::Value,
478 };
479 use serde::{Deserialize, Serialize};
480 use std::cell::RefCell;
481
482 const CANISTER_PATH: &str = "index_plan_test::TestCanister";
483 const DATA_STORE_PATH: &str = "index_plan_test::TestDataStore";
484 const INDEX_STORE_PATH: &str = "index_plan_test::TestIndexStore";
485 const ENTITY_PATH: &str = "index_plan_test::TestEntity";
486
487 const INDEX_FIELDS: [&str; 1] = ["tag"];
488 const INDEX_MODEL: IndexModel = IndexModel::new(
489 "index_plan_test::idx_tag",
490 INDEX_STORE_PATH,
491 &INDEX_FIELDS,
492 true,
493 );
494 const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
495
496 const TEST_FIELDS: [EntityFieldModel; 2] = [
497 EntityFieldModel {
498 name: "id",
499 kind: EntityFieldKind::Ulid,
500 },
501 EntityFieldModel {
502 name: "tag",
503 kind: EntityFieldKind::Text,
504 },
505 ];
506 const TEST_MODEL: EntityModel = EntityModel {
507 path: ENTITY_PATH,
508 entity_name: "TestEntity",
509 primary_key: &TEST_FIELDS[0],
510 fields: &TEST_FIELDS,
511 indexes: &INDEXES,
512 };
513
514 #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
515 struct TestEntity {
516 id: Ulid,
517 tag: String,
518 }
519
520 impl Path for TestEntity {
521 const PATH: &'static str = ENTITY_PATH;
522 }
523
524 impl View for TestEntity {
525 type ViewType = Self;
526
527 fn to_view(&self) -> Self::ViewType {
528 self.clone()
529 }
530
531 fn from_view(view: Self::ViewType) -> Result<Self, ViewError> {
532 Ok(view)
533 }
534 }
535
536 impl SanitizeAuto for TestEntity {}
537 impl SanitizeCustom for TestEntity {}
538 impl ValidateAuto for TestEntity {}
539 impl ValidateCustom for TestEntity {}
540 impl Visitable for TestEntity {}
541
542 impl FieldValues for TestEntity {
543 fn get_value(&self, field: &str) -> Option<Value> {
544 match field {
545 "id" => Some(Value::Ulid(self.id)),
546 "tag" => Some(Value::Text(self.tag.clone())),
547 _ => None,
548 }
549 }
550 }
551
552 #[derive(Clone, Copy)]
553 struct TestCanister;
554
555 impl Path for TestCanister {
556 const PATH: &'static str = CANISTER_PATH;
557 }
558
559 impl CanisterKind for TestCanister {}
560
561 struct TestStore;
562
563 impl Path for TestStore {
564 const PATH: &'static str = DATA_STORE_PATH;
565 }
566
567 impl StoreKind for TestStore {
568 type Canister = TestCanister;
569 }
570
571 impl EntityKind for TestEntity {
572 type PrimaryKey = Ulid;
573 type Store = TestStore;
574 type Canister = TestCanister;
575
576 const ENTITY_NAME: &'static str = "TestEntity";
577 const PRIMARY_KEY: &'static str = "id";
578 const FIELDS: &'static [&'static str] = &["id", "tag"];
579 const INDEXES: &'static [&'static IndexModel] = &INDEXES;
580 const MODEL: &'static EntityModel = &TEST_MODEL;
581
582 fn key(&self) -> crate::key::Key {
583 self.id.into()
584 }
585
586 fn primary_key(&self) -> Self::PrimaryKey {
587 self.id
588 }
589
590 fn set_primary_key(&mut self, key: Self::PrimaryKey) {
591 self.id = key;
592 }
593 }
594
595 canic_memory::eager_static! {
596 static TEST_DATA_STORE: RefCell<DataStore> =
597 RefCell::new(DataStore::init(canic_memory::ic_memory!(DataStore, 20)));
598 }
599
600 canic_memory::eager_static! {
601 static TEST_INDEX_STORE: RefCell<IndexStore> =
602 RefCell::new(IndexStore::init(canic_memory::ic_memory!(IndexStore, 21)));
603 }
604
605 thread_local! {
606 static DATA_REGISTRY: DataStoreRegistry = {
607 let mut reg = DataStoreRegistry::new();
608 reg.register(DATA_STORE_PATH, &TEST_DATA_STORE);
609 reg
610 };
611
612 static INDEX_REGISTRY: IndexStoreRegistry = {
613 let mut reg = IndexStoreRegistry::new();
614 reg.register(INDEX_STORE_PATH, &TEST_INDEX_STORE);
615 reg
616 };
617 }
618
619 static DB: Db<TestCanister> = Db::new(&DATA_REGISTRY, &INDEX_REGISTRY);
620
621 canic_memory::eager_init!({
622 canic_memory::ic_memory_range!(0, 60);
623 });
624
625 fn reset_stores() {
626 TEST_DATA_STORE.with_borrow_mut(|store| store.clear());
627 TEST_INDEX_STORE.with_borrow_mut(|store| store.clear());
628 }
629
630 fn seed_entity(entity: &TestEntity) {
631 let data_key = DataKey::new::<TestEntity>(entity.id);
632 let raw_key = data_key.to_raw().expect("data key encode");
633 let raw_row = RawRow::try_new(serialize(entity).unwrap()).unwrap();
634 TEST_DATA_STORE.with_borrow_mut(|store| store.insert(raw_key, raw_row));
635
636 let index_key = IndexKey::new(entity, &INDEX_MODEL)
637 .expect("index key")
638 .expect("index key missing");
639 let raw_index_key = index_key.to_raw();
640 let entry = IndexEntry::new(entity.key());
641 let raw_entry = RawIndexEntry::try_from_entry(&entry).unwrap();
642 TEST_INDEX_STORE.with_borrow_mut(|store| store.insert(raw_index_key, raw_entry));
643 }
644
645 #[test]
646 fn unique_collision_equal_values_is_conflict() {
647 with_test_hash_override([0xAA; 16], || {
648 reset_stores();
649
650 let existing = TestEntity {
651 id: Ulid::from_u128(1),
652 tag: "alpha".to_string(),
653 };
654 seed_entity(&existing);
655
656 let incoming = TestEntity {
657 id: Ulid::from_u128(2),
658 tag: "alpha".to_string(),
659 };
660
661 let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
662 .expect_err("expected unique conflict");
663 assert_eq!(err.class, ErrorClass::Conflict);
664 });
665 }
666
667 #[test]
668 fn unique_collision_different_values_is_corruption() {
669 with_test_hash_override([0xAA; 16], || {
670 reset_stores();
671
672 let existing = TestEntity {
673 id: Ulid::from_u128(1),
674 tag: "alpha".to_string(),
675 };
676 seed_entity(&existing);
677
678 let incoming = TestEntity {
679 id: Ulid::from_u128(2),
680 tag: "beta".to_string(),
681 };
682
683 let err = plan_index_mutation_for_entity::<TestEntity>(&DB, None, Some(&incoming))
684 .expect_err("expected hash collision");
685 assert_eq!(err.class, ErrorClass::Corruption);
686 assert_eq!(err.origin, ErrorOrigin::Index);
687 });
688 }
689}