1#[cfg(test)]
2mod tests;
3
4use crate::{
5 db::{
6 CommitIndexOp,
7 executor::ExecutorError,
8 index::{
9 IndexEntry, IndexEntryCorruption, IndexEntryEncodeError, IndexKey, IndexStore,
10 RawIndexEntry, RawIndexKey,
11 },
12 store::DataKey,
13 },
14 error::{ErrorClass, ErrorOrigin, InternalError},
15 model::index::IndexModel,
16 obs::sink::{self, MetricsEvent},
17 traits::{EntityKind, EntityValue, FieldValue, Storable},
18};
19use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
20
21#[derive(Debug)]
26pub struct IndexApplyPlan {
27 pub index: &'static IndexModel,
28 pub store: &'static LocalKey<RefCell<IndexStore>>,
29}
30
31#[derive(Debug)]
36pub struct IndexMutationPlan {
37 pub apply: Vec<IndexApplyPlan>,
38 pub commit_ops: Vec<CommitIndexOp>,
39}
40
41pub fn plan_index_mutation_for_entity<E: EntityKind + EntityValue>(
51 db: &crate::db::Db<E::Canister>,
52 old: Option<&E>,
53 new: Option<&E>,
54) -> Result<IndexMutationPlan, InternalError> {
55 let old_entity_key = old.map(EntityValue::id);
56 let new_entity_key = new.map(EntityValue::id);
57
58 let mut apply = Vec::with_capacity(E::INDEXES.len());
59 let mut commit_ops = Vec::new();
60
61 for index in E::INDEXES {
62 let store = db.with_index(|reg| reg.try_get_store(index.store))?;
63
64 let old_key = match old {
65 Some(entity) => IndexKey::new(entity, index)?,
66 None => None,
67 };
68 let new_key = match new {
69 Some(entity) => IndexKey::new(entity, index)?,
70 None => None,
71 };
72
73 let old_entry = load_existing_entry(store, index, old)?;
74 if let Some(old_key) = &old_key {
76 let Some(old_entity_key) = old_entity_key else {
77 return Err(InternalError::new(
78 ErrorClass::Internal,
79 ErrorOrigin::Index,
80 "missing old entity key for index removal".to_string(),
81 ));
82 };
83 let entry = old_entry.as_ref().ok_or_else(|| {
84 ExecutorError::corruption(
85 ErrorOrigin::Index,
86 format!(
87 "index corrupted: {} ({}) -> {}",
88 E::PATH,
89 index.fields.join(", "),
90 IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
91 ),
92 )
93 })?;
94 if index.unique && entry.len() > 1 {
95 return Err(ExecutorError::corruption(
96 ErrorOrigin::Index,
97 format!(
98 "index corrupted: {} ({}) -> {}",
99 E::PATH,
100 index.fields.join(", "),
101 IndexEntryCorruption::NonUniqueEntry { keys: entry.len() }
102 ),
103 )
104 .into());
105 }
106 if !entry.contains(old_entity_key) {
107 return Err(ExecutorError::corruption(
108 ErrorOrigin::Index,
109 format!(
110 "index corrupted: {} ({}) -> {}",
111 E::PATH,
112 index.fields.join(", "),
113 IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
114 ),
115 )
116 .into());
117 }
118 }
119 let new_entry = if old_key == new_key {
120 old_entry.clone()
121 } else {
122 load_existing_entry(store, index, new)?
123 };
124
125 validate_unique_constraint::<E>(
126 db,
127 index,
128 new_entry.as_ref(),
129 new_entity_key.as_ref(),
130 new,
131 )?;
132
133 build_commit_ops_for_index::<E>(
134 &mut commit_ops,
135 index,
136 old_key,
137 new_key,
138 old_entry,
139 new_entry,
140 old_entity_key,
141 new_entity_key,
142 )?;
143
144 apply.push(IndexApplyPlan { index, store });
145 }
146
147 Ok(IndexMutationPlan { apply, commit_ops })
148}
149
150fn load_existing_entry<E: EntityKind + EntityValue>(
151 store: &'static LocalKey<RefCell<IndexStore>>,
152 index: &'static IndexModel,
153 entity: Option<&E>,
154) -> Result<Option<IndexEntry<E>>, InternalError> {
155 let Some(entity) = entity else {
156 return Ok(None);
157 };
158 let Some(key) = IndexKey::new(entity, index)? else {
159 return Ok(None);
160 };
161
162 store
163 .with_borrow(|s| s.get(&key.to_raw()))
164 .map(|raw| {
165 raw.try_decode().map_err(|err| {
166 ExecutorError::corruption(
167 ErrorOrigin::Index,
168 format!(
169 "index corrupted: {} ({}) -> {}",
170 E::PATH,
171 index.fields.join(", "),
172 err
173 ),
174 )
175 .into()
176 })
177 })
178 .transpose()
179}
180
181#[expect(clippy::too_many_lines)]
187fn validate_unique_constraint<E: EntityKind + EntityValue>(
188 db: &crate::db::Db<E::Canister>,
189 index: &IndexModel,
190 entry: Option<&IndexEntry<E>>,
191 new_key: Option<&E::Id>,
192 new_entity: Option<&E>,
193) -> Result<(), InternalError> {
194 if !index.unique {
195 return Ok(());
196 }
197
198 let Some(entry) = entry else {
199 return Ok(());
200 };
201
202 if entry.len() > 1 {
203 return Err(ExecutorError::corruption(
204 ErrorOrigin::Index,
205 format!(
206 "index corrupted: {} ({}) -> {} keys",
207 E::PATH,
208 index.fields.join(", "),
209 entry.len()
210 ),
211 )
212 .into());
213 }
214
215 let Some(new_key) = new_key else {
216 return Ok(());
217 };
218 if entry.contains(*new_key) {
219 return Ok(());
220 }
221
222 let Some(new_entity) = new_entity else {
223 return Err(InternalError::new(
224 ErrorClass::InvariantViolation,
225 ErrorOrigin::Index,
226 "missing entity payload during unique validation".to_string(),
227 ));
228 };
229 let existing_key = entry.single_id().ok_or_else(|| {
230 ExecutorError::corruption(
231 ErrorOrigin::Index,
232 format!(
233 "index corrupted: {} ({}) -> {} keys",
234 E::PATH,
235 index.fields.join(", "),
236 entry.len()
237 ),
238 )
239 })?;
240
241 let stored = {
242 let data_key = DataKey::try_new::<E>(existing_key)?;
243 let row = db.context::<E>().read_strict(&data_key)?;
244 row.try_decode::<E>().map_err(|err| {
245 ExecutorError::corruption(
246 ErrorOrigin::Serialize,
247 format!("failed to deserialize row: {data_key} ({err})"),
248 )
249 })?
250 };
251 let stored_key = stored.id();
252 if stored_key != existing_key {
253 return Err(ExecutorError::corruption(
255 ErrorOrigin::Store,
256 format!(
257 "index corrupted: {} ({}) -> {}",
258 E::PATH,
259 index.fields.join(", "),
260 IndexEntryCorruption::RowKeyMismatch {
261 indexed_key: Box::new(existing_key.to_value()),
262 row_key: Box::new(stored_key.to_value()),
263 }
264 ),
265 )
266 .into());
267 }
268
269 for field in index.fields {
270 let expected = new_entity.get_value(field).ok_or_else(|| {
271 InternalError::new(
272 ErrorClass::InvariantViolation,
273 ErrorOrigin::Index,
274 format!(
275 "index field missing on lookup entity: {} ({})",
276 E::PATH,
277 field
278 ),
279 )
280 })?;
281 let actual = stored.get_value(field).ok_or_else(|| {
282 ExecutorError::corruption(
283 ErrorOrigin::Index,
284 format!(
285 "index corrupted: {} ({}) -> stored entity missing field",
286 E::PATH,
287 field
288 ),
289 )
290 })?;
291
292 if expected != actual {
293 return Err(ExecutorError::corruption(
294 ErrorOrigin::Index,
295 format!("index hash collision: {} ({})", E::PATH, field),
296 )
297 .into());
298 }
299 }
300
301 sink::record(MetricsEvent::UniqueViolation {
302 entity_path: E::PATH,
303 });
304
305 Err(ExecutorError::index_violation(E::PATH, index.fields).into())
306}
307
308#[allow(clippy::too_many_arguments)]
317fn build_commit_ops_for_index<E: EntityKind>(
318 commit_ops: &mut Vec<CommitIndexOp>,
319 index: &'static IndexModel,
320 old_key: Option<IndexKey>,
321 new_key: Option<IndexKey>,
322 old_entry: Option<IndexEntry<E>>,
323 new_entry: Option<IndexEntry<E>>,
324 old_entity_key: Option<E::Id>,
325 new_entity_key: Option<E::Id>,
326) -> Result<(), InternalError> {
327 let mut touched: BTreeMap<RawIndexKey, Option<IndexEntry<E>>> = BTreeMap::new();
328 let fields = index.fields.join(", ");
329
330 if let Some(old_key) = old_key {
333 let Some(old_entity_key) = old_entity_key else {
334 return Err(InternalError::new(
335 ErrorClass::Internal,
336 ErrorOrigin::Index,
337 "missing old entity key for index removal".to_string(),
338 ));
339 };
340
341 if let Some(mut entry) = old_entry {
342 entry.remove(old_entity_key);
343 let after = if entry.is_empty() { None } else { Some(entry) };
344 touched.insert(old_key.to_raw(), after);
345 } else {
346 touched.insert(old_key.to_raw(), None);
348 }
349 }
350
351 if let Some(new_key) = new_key {
354 let Some(new_entity_key) = new_entity_key else {
355 return Err(InternalError::new(
356 ErrorClass::Internal,
357 ErrorOrigin::Index,
358 "missing new entity key for index insertion".to_string(),
359 ));
360 };
361
362 let raw_key = new_key.to_raw();
363
364 let mut entry = if let Some(existing) = touched.remove(&raw_key) {
369 existing.unwrap_or_else(|| IndexEntry::new(new_entity_key))
370 } else if let Some(existing) = new_entry {
371 existing
372 } else {
373 IndexEntry::new(new_entity_key)
374 };
375
376 entry.insert(new_entity_key);
377 touched.insert(raw_key, Some(entry));
378 }
379
380 for (raw_key, entry) in touched {
383 let value = if let Some(entry) = entry {
384 let raw = RawIndexEntry::try_from(&entry).map_err(|err| match err {
385 IndexEntryEncodeError::TooManyKeys { keys } => InternalError::new(
386 ErrorClass::Unsupported,
387 ErrorOrigin::Index,
388 format!(
389 "index entry exceeds max keys: {} ({}) -> {} keys",
390 E::PATH,
391 fields,
392 keys
393 ),
394 ),
395 IndexEntryEncodeError::KeyEncoding(err) => InternalError::new(
396 ErrorClass::Unsupported,
397 ErrorOrigin::Index,
398 format!(
399 "index entry key encoding failed: {} ({}) -> {err}",
400 E::PATH,
401 fields
402 ),
403 ),
404 })?;
405 Some(raw.into_bytes())
406 } else {
407 None
408 };
409
410 commit_ops.push(CommitIndexOp {
411 store: index.store.to_string(),
412 key: raw_key.as_bytes().to_vec(),
413 value,
414 });
415 }
416
417 Ok(())
418}