1use crate::{
2 db::{
3 CommitIndexOp,
4 executor::ExecutorError,
5 index::{
6 IndexEntry, IndexEntryCorruption, IndexEntryEncodeError, IndexKey, IndexStore,
7 RawIndexEntry, RawIndexKey,
8 },
9 store::DataKey,
10 },
11 error::{ErrorClass, ErrorOrigin, InternalError},
12 model::index::IndexModel,
13 obs::sink::{self, MetricsEvent},
14 traits::{EntityKind, EntityValue, FieldValue, Storable},
15};
16use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
17
18#[derive(Debug)]
23pub struct IndexApplyPlan {
24 pub index: &'static IndexModel,
25 pub store: &'static LocalKey<RefCell<IndexStore>>,
26}
27
28#[derive(Debug)]
33pub struct IndexMutationPlan {
34 pub apply: Vec<IndexApplyPlan>,
35 pub commit_ops: Vec<CommitIndexOp>,
36}
37
38pub fn plan_index_mutation_for_entity<E: EntityKind + EntityValue>(
48 db: &crate::db::Db<E::Canister>,
49 old: Option<&E>,
50 new: Option<&E>,
51) -> Result<IndexMutationPlan, InternalError> {
52 let old_entity_key = old.map(|entity| entity.id().key());
53 let new_entity_key = new.map(|entity| entity.id().key());
54
55 let mut apply = Vec::with_capacity(E::INDEXES.len());
56 let mut commit_ops = Vec::new();
57
58 for index in E::INDEXES {
59 let store = db.with_index(|reg| reg.try_get_store(index.store))?;
60
61 let old_key = match old {
62 Some(entity) => IndexKey::new(entity, index)?,
63 None => None,
64 };
65 let new_key = match new {
66 Some(entity) => IndexKey::new(entity, index)?,
67 None => None,
68 };
69
70 let old_entry = load_existing_entry(store, index, old)?;
71 if let Some(old_key) = &old_key {
73 let Some(old_entity_key) = old_entity_key else {
74 return Err(InternalError::new(
75 ErrorClass::Internal,
76 ErrorOrigin::Index,
77 "missing old entity key for index removal".to_string(),
78 ));
79 };
80 let entry = old_entry.as_ref().ok_or_else(|| {
81 ExecutorError::corruption(
82 ErrorOrigin::Index,
83 format!(
84 "index corrupted: {} ({}) -> {}",
85 E::PATH,
86 index.fields.join(", "),
87 IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
88 ),
89 )
90 })?;
91 if index.unique && entry.len() > 1 {
92 return Err(ExecutorError::corruption(
93 ErrorOrigin::Index,
94 format!(
95 "index corrupted: {} ({}) -> {}",
96 E::PATH,
97 index.fields.join(", "),
98 IndexEntryCorruption::NonUniqueEntry { keys: entry.len() }
99 ),
100 )
101 .into());
102 }
103 if !entry.contains(old_entity_key) {
104 return Err(ExecutorError::corruption(
105 ErrorOrigin::Index,
106 format!(
107 "index corrupted: {} ({}) -> {}",
108 E::PATH,
109 index.fields.join(", "),
110 IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
111 ),
112 )
113 .into());
114 }
115 }
116 let new_entry = if old_key == new_key {
117 old_entry.clone()
118 } else {
119 load_existing_entry(store, index, new)?
120 };
121
122 validate_unique_constraint::<E>(
123 db,
124 index,
125 new_entry.as_ref(),
126 new_entity_key.as_ref(),
127 new,
128 )?;
129
130 build_commit_ops_for_index::<E>(
131 &mut commit_ops,
132 index,
133 old_key,
134 new_key,
135 old_entry,
136 new_entry,
137 old_entity_key,
138 new_entity_key,
139 )?;
140
141 apply.push(IndexApplyPlan { index, store });
142 }
143
144 Ok(IndexMutationPlan { apply, commit_ops })
145}
146
147fn load_existing_entry<E: EntityKind + EntityValue>(
148 store: &'static LocalKey<RefCell<IndexStore>>,
149 index: &'static IndexModel,
150 entity: Option<&E>,
151) -> Result<Option<IndexEntry<E>>, InternalError> {
152 let Some(entity) = entity else {
153 return Ok(None);
154 };
155 let Some(key) = IndexKey::new(entity, index)? else {
156 return Ok(None);
157 };
158
159 store
160 .with_borrow(|s| s.get(&key.to_raw()))
161 .map(|raw| {
162 raw.try_decode().map_err(|err| {
163 ExecutorError::corruption(
164 ErrorOrigin::Index,
165 format!(
166 "index corrupted: {} ({}) -> {}",
167 E::PATH,
168 index.fields.join(", "),
169 err
170 ),
171 )
172 .into()
173 })
174 })
175 .transpose()
176}
177
178#[expect(clippy::too_many_lines)]
184fn validate_unique_constraint<E: EntityKind + EntityValue>(
185 db: &crate::db::Db<E::Canister>,
186 index: &IndexModel,
187 entry: Option<&IndexEntry<E>>,
188 new_key: Option<&E::Key>,
189 new_entity: Option<&E>,
190) -> Result<(), InternalError> {
191 if !index.unique {
192 return Ok(());
193 }
194
195 let Some(entry) = entry else {
196 return Ok(());
197 };
198
199 if entry.len() > 1 {
200 return Err(ExecutorError::corruption(
201 ErrorOrigin::Index,
202 format!(
203 "index corrupted: {} ({}) -> {} keys",
204 E::PATH,
205 index.fields.join(", "),
206 entry.len()
207 ),
208 )
209 .into());
210 }
211
212 let Some(new_key) = new_key else {
213 return Ok(());
214 };
215 if entry.contains(*new_key) {
216 return Ok(());
217 }
218
219 let Some(new_entity) = new_entity else {
220 return Err(InternalError::new(
221 ErrorClass::InvariantViolation,
222 ErrorOrigin::Index,
223 "missing entity payload during unique validation".to_string(),
224 ));
225 };
226 let existing_key = entry.single_id().ok_or_else(|| {
227 ExecutorError::corruption(
228 ErrorOrigin::Index,
229 format!(
230 "index corrupted: {} ({}) -> {} keys",
231 E::PATH,
232 index.fields.join(", "),
233 entry.len()
234 ),
235 )
236 })?;
237
238 let stored = {
239 let data_key = DataKey::try_new::<E>(existing_key)?;
240 let row = db.context::<E>().read_strict(&data_key)?;
241 row.try_decode::<E>().map_err(|err| {
242 ExecutorError::corruption(
243 ErrorOrigin::Serialize,
244 format!("failed to deserialize row: {data_key} ({err})"),
245 )
246 })?
247 };
248 let stored_key = stored.id().key();
249 if stored_key != existing_key {
250 return Err(ExecutorError::corruption(
252 ErrorOrigin::Store,
253 format!(
254 "index corrupted: {} ({}) -> {}",
255 E::PATH,
256 index.fields.join(", "),
257 IndexEntryCorruption::RowKeyMismatch {
258 indexed_key: Box::new(existing_key.to_value()),
259 row_key: Box::new(stored_key.to_value()),
260 }
261 ),
262 )
263 .into());
264 }
265
266 for field in index.fields {
267 let expected = new_entity.get_value(field).ok_or_else(|| {
268 InternalError::new(
269 ErrorClass::InvariantViolation,
270 ErrorOrigin::Index,
271 format!(
272 "index field missing on lookup entity: {} ({})",
273 E::PATH,
274 field
275 ),
276 )
277 })?;
278 let actual = stored.get_value(field).ok_or_else(|| {
279 ExecutorError::corruption(
280 ErrorOrigin::Index,
281 format!(
282 "index corrupted: {} ({}) -> stored entity missing field",
283 E::PATH,
284 field
285 ),
286 )
287 })?;
288
289 if expected != actual {
290 return Err(ExecutorError::corruption(
291 ErrorOrigin::Index,
292 format!("index hash collision: {} ({})", E::PATH, field),
293 )
294 .into());
295 }
296 }
297
298 sink::record(MetricsEvent::UniqueViolation {
299 entity_path: E::PATH,
300 });
301
302 Err(ExecutorError::index_violation(E::PATH, index.fields).into())
303}
304
305#[allow(clippy::too_many_arguments)]
314fn build_commit_ops_for_index<E: EntityKind>(
315 commit_ops: &mut Vec<CommitIndexOp>,
316 index: &'static IndexModel,
317 old_key: Option<IndexKey>,
318 new_key: Option<IndexKey>,
319 old_entry: Option<IndexEntry<E>>,
320 new_entry: Option<IndexEntry<E>>,
321 old_entity_key: Option<E::Key>,
322 new_entity_key: Option<E::Key>,
323) -> Result<(), InternalError> {
324 let mut touched: BTreeMap<RawIndexKey, Option<IndexEntry<E>>> = BTreeMap::new();
325 let fields = index.fields.join(", ");
326
327 if let Some(old_key) = old_key {
330 let Some(old_entity_key) = old_entity_key else {
331 return Err(InternalError::new(
332 ErrorClass::Internal,
333 ErrorOrigin::Index,
334 "missing old entity key for index removal".to_string(),
335 ));
336 };
337
338 if let Some(mut entry) = old_entry {
339 entry.remove(old_entity_key);
340 let after = if entry.is_empty() { None } else { Some(entry) };
341 touched.insert(old_key.to_raw(), after);
342 } else {
343 touched.insert(old_key.to_raw(), None);
345 }
346 }
347
348 if let Some(new_key) = new_key {
351 let Some(new_entity_key) = new_entity_key else {
352 return Err(InternalError::new(
353 ErrorClass::Internal,
354 ErrorOrigin::Index,
355 "missing new entity key for index insertion".to_string(),
356 ));
357 };
358
359 let raw_key = new_key.to_raw();
360
361 let mut entry = if let Some(existing) = touched.remove(&raw_key) {
366 existing.unwrap_or_else(|| IndexEntry::new(new_entity_key))
367 } else if let Some(existing) = new_entry {
368 existing
369 } else {
370 IndexEntry::new(new_entity_key)
371 };
372
373 entry.insert(new_entity_key);
374 touched.insert(raw_key, Some(entry));
375 }
376
377 for (raw_key, entry) in touched {
380 let value = if let Some(entry) = entry {
381 let raw = RawIndexEntry::try_from(&entry).map_err(|err| match err {
382 IndexEntryEncodeError::TooManyKeys { keys } => InternalError::new(
383 ErrorClass::Unsupported,
384 ErrorOrigin::Index,
385 format!(
386 "index entry exceeds max keys: {} ({}) -> {} keys",
387 E::PATH,
388 fields,
389 keys
390 ),
391 ),
392 IndexEntryEncodeError::KeyEncoding(err) => InternalError::new(
393 ErrorClass::Unsupported,
394 ErrorOrigin::Index,
395 format!(
396 "index entry key encoding failed: {} ({}) -> {err}",
397 E::PATH,
398 fields
399 ),
400 ),
401 })?;
402 Some(raw.into_bytes())
403 } else {
404 None
405 };
406
407 commit_ops.push(CommitIndexOp {
408 store: index.store.to_string(),
409 key: raw_key.as_bytes().to_vec(),
410 value,
411 });
412 }
413
414 Ok(())
415}