Skip to main content

icydb_core/db/index/plan/
mod.rs

1use crate::{
2    db::{
3        CommitIndexOp,
4        executor::ExecutorError,
5        index::{
6            IndexEntry, IndexEntryCorruption, IndexEntryEncodeError, IndexKey, IndexStore,
7            RawIndexEntry, RawIndexKey,
8        },
9        store::DataKey,
10    },
11    error::{ErrorClass, ErrorOrigin, InternalError},
12    model::index::IndexModel,
13    obs::sink::{self, MetricsEvent},
14    traits::{EntityKind, EntityValue, FieldValue, Storable},
15};
16use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
17
18///
19/// IndexApplyPlan
20///
21
22#[derive(Debug)]
23pub struct IndexApplyPlan {
24    pub index: &'static IndexModel,
25    pub store: &'static LocalKey<RefCell<IndexStore>>,
26}
27
28///
29/// IndexMutationPlan
30///
31
32#[derive(Debug)]
33pub struct IndexMutationPlan {
34    pub apply: Vec<IndexApplyPlan>,
35    pub commit_ops: Vec<CommitIndexOp>,
36}
37
38/// Plan all index mutations for a single entity transition.
39///
40/// This function:
41/// - Loads existing index entries
42/// - Validates unique constraints
43/// - Computes the exact index writes/deletes required
44///
45/// All fallible work happens here. The returned plan is safe to apply
46/// infallibly after a commit marker is written.
47pub fn plan_index_mutation_for_entity<E: EntityKind + EntityValue>(
48    db: &crate::db::Db<E::Canister>,
49    old: Option<&E>,
50    new: Option<&E>,
51) -> Result<IndexMutationPlan, InternalError> {
52    let old_entity_key = old.map(|entity| entity.id().key());
53    let new_entity_key = new.map(|entity| entity.id().key());
54
55    let mut apply = Vec::with_capacity(E::INDEXES.len());
56    let mut commit_ops = Vec::new();
57
58    for index in E::INDEXES {
59        let store = db.with_index(|reg| reg.try_get_store(index.store))?;
60
61        let old_key = match old {
62            Some(entity) => IndexKey::new(entity, index)?,
63            None => None,
64        };
65        let new_key = match new {
66            Some(entity) => IndexKey::new(entity, index)?,
67            None => None,
68        };
69
70        let old_entry = load_existing_entry(store, index, old)?;
71        // Prevalidate membership so commit-phase mutations cannot surface corruption.
72        if let Some(old_key) = &old_key {
73            let Some(old_entity_key) = old_entity_key else {
74                return Err(InternalError::new(
75                    ErrorClass::Internal,
76                    ErrorOrigin::Index,
77                    "missing old entity key for index removal".to_string(),
78                ));
79            };
80            let entry = old_entry.as_ref().ok_or_else(|| {
81                ExecutorError::corruption(
82                    ErrorOrigin::Index,
83                    format!(
84                        "index corrupted: {} ({}) -> {}",
85                        E::PATH,
86                        index.fields.join(", "),
87                        IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
88                    ),
89                )
90            })?;
91            if index.unique && entry.len() > 1 {
92                return Err(ExecutorError::corruption(
93                    ErrorOrigin::Index,
94                    format!(
95                        "index corrupted: {} ({}) -> {}",
96                        E::PATH,
97                        index.fields.join(", "),
98                        IndexEntryCorruption::NonUniqueEntry { keys: entry.len() }
99                    ),
100                )
101                .into());
102            }
103            if !entry.contains(old_entity_key) {
104                return Err(ExecutorError::corruption(
105                    ErrorOrigin::Index,
106                    format!(
107                        "index corrupted: {} ({}) -> {}",
108                        E::PATH,
109                        index.fields.join(", "),
110                        IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
111                    ),
112                )
113                .into());
114            }
115        }
116        let new_entry = if old_key == new_key {
117            old_entry.clone()
118        } else {
119            load_existing_entry(store, index, new)?
120        };
121
122        validate_unique_constraint::<E>(
123            db,
124            index,
125            new_entry.as_ref(),
126            new_entity_key.as_ref(),
127            new,
128        )?;
129
130        build_commit_ops_for_index::<E>(
131            &mut commit_ops,
132            index,
133            old_key,
134            new_key,
135            old_entry,
136            new_entry,
137            old_entity_key,
138            new_entity_key,
139        )?;
140
141        apply.push(IndexApplyPlan { index, store });
142    }
143
144    Ok(IndexMutationPlan { apply, commit_ops })
145}
146
147fn load_existing_entry<E: EntityKind + EntityValue>(
148    store: &'static LocalKey<RefCell<IndexStore>>,
149    index: &'static IndexModel,
150    entity: Option<&E>,
151) -> Result<Option<IndexEntry<E>>, InternalError> {
152    let Some(entity) = entity else {
153        return Ok(None);
154    };
155    let Some(key) = IndexKey::new(entity, index)? else {
156        return Ok(None);
157    };
158
159    store
160        .with_borrow(|s| s.get(&key.to_raw()))
161        .map(|raw| {
162            raw.try_decode().map_err(|err| {
163                ExecutorError::corruption(
164                    ErrorOrigin::Index,
165                    format!(
166                        "index corrupted: {} ({}) -> {}",
167                        E::PATH,
168                        index.fields.join(", "),
169                        err
170                    ),
171                )
172                .into()
173            })
174        })
175        .transpose()
176}
177
178/// Validate unique index constraints against the existing index entry.
179///
180/// This detects:
181/// - Index corruption (multiple keys in a unique entry)
182/// - Uniqueness violations (conflicting key ownership)
183#[expect(clippy::too_many_lines)]
184fn validate_unique_constraint<E: EntityKind + EntityValue>(
185    db: &crate::db::Db<E::Canister>,
186    index: &IndexModel,
187    entry: Option<&IndexEntry<E>>,
188    new_key: Option<&E::Key>,
189    new_entity: Option<&E>,
190) -> Result<(), InternalError> {
191    if !index.unique {
192        return Ok(());
193    }
194
195    let Some(entry) = entry else {
196        return Ok(());
197    };
198
199    if entry.len() > 1 {
200        return Err(ExecutorError::corruption(
201            ErrorOrigin::Index,
202            format!(
203                "index corrupted: {} ({}) -> {} keys",
204                E::PATH,
205                index.fields.join(", "),
206                entry.len()
207            ),
208        )
209        .into());
210    }
211
212    let Some(new_key) = new_key else {
213        return Ok(());
214    };
215    if entry.contains(*new_key) {
216        return Ok(());
217    }
218
219    let Some(new_entity) = new_entity else {
220        return Err(InternalError::new(
221            ErrorClass::InvariantViolation,
222            ErrorOrigin::Index,
223            "missing entity payload during unique validation".to_string(),
224        ));
225    };
226    let existing_key = entry.single_id().ok_or_else(|| {
227        ExecutorError::corruption(
228            ErrorOrigin::Index,
229            format!(
230                "index corrupted: {} ({}) -> {} keys",
231                E::PATH,
232                index.fields.join(", "),
233                entry.len()
234            ),
235        )
236    })?;
237
238    let stored = {
239        let data_key = DataKey::try_new::<E>(existing_key)?;
240        let row = db.context::<E>().read_strict(&data_key)?;
241        row.try_decode::<E>().map_err(|err| {
242            ExecutorError::corruption(
243                ErrorOrigin::Serialize,
244                format!("failed to deserialize row: {data_key} ({err})"),
245            )
246        })?
247    };
248    let stored_key = stored.id().key();
249    if stored_key != existing_key {
250        // Stored row decoded successfully but key mismatch indicates index/data divergence; treat as corruption.
251        return Err(ExecutorError::corruption(
252            ErrorOrigin::Store,
253            format!(
254                "index corrupted: {} ({}) -> {}",
255                E::PATH,
256                index.fields.join(", "),
257                IndexEntryCorruption::RowKeyMismatch {
258                    indexed_key: Box::new(existing_key.to_value()),
259                    row_key: Box::new(stored_key.to_value()),
260                }
261            ),
262        )
263        .into());
264    }
265
266    for field in index.fields {
267        let expected = new_entity.get_value(field).ok_or_else(|| {
268            InternalError::new(
269                ErrorClass::InvariantViolation,
270                ErrorOrigin::Index,
271                format!(
272                    "index field missing on lookup entity: {} ({})",
273                    E::PATH,
274                    field
275                ),
276            )
277        })?;
278        let actual = stored.get_value(field).ok_or_else(|| {
279            ExecutorError::corruption(
280                ErrorOrigin::Index,
281                format!(
282                    "index corrupted: {} ({}) -> stored entity missing field",
283                    E::PATH,
284                    field
285                ),
286            )
287        })?;
288
289        if expected != actual {
290            return Err(ExecutorError::corruption(
291                ErrorOrigin::Index,
292                format!("index hash collision: {} ({})", E::PATH, field),
293            )
294            .into());
295        }
296    }
297
298    sink::record(MetricsEvent::UniqueViolation {
299        entity_path: E::PATH,
300    });
301
302    Err(ExecutorError::index_violation(E::PATH, index.fields).into())
303}
304
305/// Compute commit-time index operations for a single index.
306///
307/// Produces a minimal set of index updates:
308/// - `Some(bytes)` → insert/update index entry
309/// - `None`        → delete index entry
310///
311/// Correctly handles old/new key overlap and guarantees that
312/// apply-time mutations cannot fail except by invariant violation.
313#[allow(clippy::too_many_arguments)]
314fn build_commit_ops_for_index<E: EntityKind>(
315    commit_ops: &mut Vec<CommitIndexOp>,
316    index: &'static IndexModel,
317    old_key: Option<IndexKey>,
318    new_key: Option<IndexKey>,
319    old_entry: Option<IndexEntry<E>>,
320    new_entry: Option<IndexEntry<E>>,
321    old_entity_key: Option<E::Key>,
322    new_entity_key: Option<E::Key>,
323) -> Result<(), InternalError> {
324    let mut touched: BTreeMap<RawIndexKey, Option<IndexEntry<E>>> = BTreeMap::new();
325    let fields = index.fields.join(", ");
326
327    // ── Removal ────────────────────────────────
328
329    if let Some(old_key) = old_key {
330        let Some(old_entity_key) = old_entity_key else {
331            return Err(InternalError::new(
332                ErrorClass::Internal,
333                ErrorOrigin::Index,
334                "missing old entity key for index removal".to_string(),
335            ));
336        };
337
338        if let Some(mut entry) = old_entry {
339            entry.remove(old_entity_key);
340            let after = if entry.is_empty() { None } else { Some(entry) };
341            touched.insert(old_key.to_raw(), after);
342        } else {
343            // No existing index entry -> nothing to remove.
344            touched.insert(old_key.to_raw(), None);
345        }
346    }
347
348    // ── Insertion ──────────────────────────────
349
350    if let Some(new_key) = new_key {
351        let Some(new_entity_key) = new_entity_key else {
352            return Err(InternalError::new(
353                ErrorClass::Internal,
354                ErrorOrigin::Index,
355                "missing new entity key for index insertion".to_string(),
356            ));
357        };
358
359        let raw_key = new_key.to_raw();
360
361        // Start from:
362        //   1. result of removal (if same key)
363        //   2. existing entry loaded from store
364        //   3. brand new entry
365        let mut entry = if let Some(existing) = touched.remove(&raw_key) {
366            existing.unwrap_or_else(|| IndexEntry::new(new_entity_key))
367        } else if let Some(existing) = new_entry {
368            existing
369        } else {
370            IndexEntry::new(new_entity_key)
371        };
372
373        entry.insert(new_entity_key);
374        touched.insert(raw_key, Some(entry));
375    }
376
377    // ── Emit commit ops ────────────────────────
378
379    for (raw_key, entry) in touched {
380        let value = if let Some(entry) = entry {
381            let raw = RawIndexEntry::try_from(&entry).map_err(|err| match err {
382                IndexEntryEncodeError::TooManyKeys { keys } => InternalError::new(
383                    ErrorClass::Unsupported,
384                    ErrorOrigin::Index,
385                    format!(
386                        "index entry exceeds max keys: {} ({}) -> {} keys",
387                        E::PATH,
388                        fields,
389                        keys
390                    ),
391                ),
392                IndexEntryEncodeError::KeyEncoding(err) => InternalError::new(
393                    ErrorClass::Unsupported,
394                    ErrorOrigin::Index,
395                    format!(
396                        "index entry key encoding failed: {} ({}) -> {err}",
397                        E::PATH,
398                        fields
399                    ),
400                ),
401            })?;
402            Some(raw.into_bytes())
403        } else {
404            None
405        };
406
407        commit_ops.push(CommitIndexOp {
408            store: index.store.to_string(),
409            key: raw_key.as_bytes().to_vec(),
410            value,
411        });
412    }
413
414    Ok(())
415}