Skip to main content

icydb_core/db/index/plan/
mod.rs

1#[cfg(test)]
2mod tests;
3
4use crate::{
5    db::{
6        CommitIndexOp,
7        executor::ExecutorError,
8        index::{
9            IndexEntry, IndexEntryCorruption, IndexEntryEncodeError, IndexKey, IndexStore,
10            RawIndexEntry, RawIndexKey,
11        },
12        store::DataKey,
13    },
14    error::{ErrorClass, ErrorOrigin, InternalError},
15    model::index::IndexModel,
16    obs::sink::{self, MetricsEvent},
17    traits::{EntityKind, EntityValue, FieldValue, Storable},
18};
19use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
20
21///
22/// IndexApplyPlan
23///
24
25#[derive(Debug)]
26pub struct IndexApplyPlan {
27    pub index: &'static IndexModel,
28    pub store: &'static LocalKey<RefCell<IndexStore>>,
29}
30
31///
32/// IndexMutationPlan
33///
34
35#[derive(Debug)]
36pub struct IndexMutationPlan {
37    pub apply: Vec<IndexApplyPlan>,
38    pub commit_ops: Vec<CommitIndexOp>,
39}
40
41/// Plan all index mutations for a single entity transition.
42///
43/// This function:
44/// - Loads existing index entries
45/// - Validates unique constraints
46/// - Computes the exact index writes/deletes required
47///
48/// All fallible work happens here. The returned plan is safe to apply
49/// infallibly after a commit marker is written.
50pub fn plan_index_mutation_for_entity<E: EntityKind + EntityValue>(
51    db: &crate::db::Db<E::Canister>,
52    old: Option<&E>,
53    new: Option<&E>,
54) -> Result<IndexMutationPlan, InternalError> {
55    let old_entity_key = old.map(EntityValue::id);
56    let new_entity_key = new.map(EntityValue::id);
57
58    let mut apply = Vec::with_capacity(E::INDEXES.len());
59    let mut commit_ops = Vec::new();
60
61    for index in E::INDEXES {
62        let store = db.with_index(|reg| reg.try_get_store(index.store))?;
63
64        let old_key = match old {
65            Some(entity) => IndexKey::new(entity, index)?,
66            None => None,
67        };
68        let new_key = match new {
69            Some(entity) => IndexKey::new(entity, index)?,
70            None => None,
71        };
72
73        let old_entry = load_existing_entry(store, index, old)?;
74        // Prevalidate membership so commit-phase mutations cannot surface corruption.
75        if let Some(old_key) = &old_key {
76            let Some(old_entity_key) = old_entity_key else {
77                return Err(InternalError::new(
78                    ErrorClass::Internal,
79                    ErrorOrigin::Index,
80                    "missing old entity key for index removal".to_string(),
81                ));
82            };
83            let entry = old_entry.as_ref().ok_or_else(|| {
84                ExecutorError::corruption(
85                    ErrorOrigin::Index,
86                    format!(
87                        "index corrupted: {} ({}) -> {}",
88                        E::PATH,
89                        index.fields.join(", "),
90                        IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
91                    ),
92                )
93            })?;
94            if index.unique && entry.len() > 1 {
95                return Err(ExecutorError::corruption(
96                    ErrorOrigin::Index,
97                    format!(
98                        "index corrupted: {} ({}) -> {}",
99                        E::PATH,
100                        index.fields.join(", "),
101                        IndexEntryCorruption::NonUniqueEntry { keys: entry.len() }
102                    ),
103                )
104                .into());
105            }
106            if !entry.contains(old_entity_key) {
107                return Err(ExecutorError::corruption(
108                    ErrorOrigin::Index,
109                    format!(
110                        "index corrupted: {} ({}) -> {}",
111                        E::PATH,
112                        index.fields.join(", "),
113                        IndexEntryCorruption::missing_key(old_key.to_raw(), old_entity_key)
114                    ),
115                )
116                .into());
117            }
118        }
119        let new_entry = if old_key == new_key {
120            old_entry.clone()
121        } else {
122            load_existing_entry(store, index, new)?
123        };
124
125        validate_unique_constraint::<E>(
126            db,
127            index,
128            new_entry.as_ref(),
129            new_entity_key.as_ref(),
130            new,
131        )?;
132
133        build_commit_ops_for_index::<E>(
134            &mut commit_ops,
135            index,
136            old_key,
137            new_key,
138            old_entry,
139            new_entry,
140            old_entity_key,
141            new_entity_key,
142        )?;
143
144        apply.push(IndexApplyPlan { index, store });
145    }
146
147    Ok(IndexMutationPlan { apply, commit_ops })
148}
149
150fn load_existing_entry<E: EntityKind + EntityValue>(
151    store: &'static LocalKey<RefCell<IndexStore>>,
152    index: &'static IndexModel,
153    entity: Option<&E>,
154) -> Result<Option<IndexEntry<E>>, InternalError> {
155    let Some(entity) = entity else {
156        return Ok(None);
157    };
158    let Some(key) = IndexKey::new(entity, index)? else {
159        return Ok(None);
160    };
161
162    store
163        .with_borrow(|s| s.get(&key.to_raw()))
164        .map(|raw| {
165            raw.try_decode().map_err(|err| {
166                ExecutorError::corruption(
167                    ErrorOrigin::Index,
168                    format!(
169                        "index corrupted: {} ({}) -> {}",
170                        E::PATH,
171                        index.fields.join(", "),
172                        err
173                    ),
174                )
175                .into()
176            })
177        })
178        .transpose()
179}
180
181/// Validate unique index constraints against the existing index entry.
182///
183/// This detects:
184/// - Index corruption (multiple keys in a unique entry)
185/// - Uniqueness violations (conflicting key ownership)
186#[expect(clippy::too_many_lines)]
187fn validate_unique_constraint<E: EntityKind + EntityValue>(
188    db: &crate::db::Db<E::Canister>,
189    index: &IndexModel,
190    entry: Option<&IndexEntry<E>>,
191    new_key: Option<&E::Id>,
192    new_entity: Option<&E>,
193) -> Result<(), InternalError> {
194    if !index.unique {
195        return Ok(());
196    }
197
198    let Some(entry) = entry else {
199        return Ok(());
200    };
201
202    if entry.len() > 1 {
203        return Err(ExecutorError::corruption(
204            ErrorOrigin::Index,
205            format!(
206                "index corrupted: {} ({}) -> {} keys",
207                E::PATH,
208                index.fields.join(", "),
209                entry.len()
210            ),
211        )
212        .into());
213    }
214
215    let Some(new_key) = new_key else {
216        return Ok(());
217    };
218    if entry.contains(*new_key) {
219        return Ok(());
220    }
221
222    let Some(new_entity) = new_entity else {
223        return Err(InternalError::new(
224            ErrorClass::InvariantViolation,
225            ErrorOrigin::Index,
226            "missing entity payload during unique validation".to_string(),
227        ));
228    };
229    let existing_key = entry.single_id().ok_or_else(|| {
230        ExecutorError::corruption(
231            ErrorOrigin::Index,
232            format!(
233                "index corrupted: {} ({}) -> {} keys",
234                E::PATH,
235                index.fields.join(", "),
236                entry.len()
237            ),
238        )
239    })?;
240
241    let stored = {
242        let data_key = DataKey::try_new::<E>(existing_key)?;
243        let row = db.context::<E>().read_strict(&data_key)?;
244        row.try_decode::<E>().map_err(|err| {
245            ExecutorError::corruption(
246                ErrorOrigin::Serialize,
247                format!("failed to deserialize row: {data_key} ({err})"),
248            )
249        })?
250    };
251    let stored_key = stored.id();
252    if stored_key != existing_key {
253        // Stored row decoded successfully but key mismatch indicates index/data divergence; treat as corruption.
254        return Err(ExecutorError::corruption(
255            ErrorOrigin::Store,
256            format!(
257                "index corrupted: {} ({}) -> {}",
258                E::PATH,
259                index.fields.join(", "),
260                IndexEntryCorruption::RowKeyMismatch {
261                    indexed_key: Box::new(existing_key.to_value()),
262                    row_key: Box::new(stored_key.to_value()),
263                }
264            ),
265        )
266        .into());
267    }
268
269    for field in index.fields {
270        let expected = new_entity.get_value(field).ok_or_else(|| {
271            InternalError::new(
272                ErrorClass::InvariantViolation,
273                ErrorOrigin::Index,
274                format!(
275                    "index field missing on lookup entity: {} ({})",
276                    E::PATH,
277                    field
278                ),
279            )
280        })?;
281        let actual = stored.get_value(field).ok_or_else(|| {
282            ExecutorError::corruption(
283                ErrorOrigin::Index,
284                format!(
285                    "index corrupted: {} ({}) -> stored entity missing field",
286                    E::PATH,
287                    field
288                ),
289            )
290        })?;
291
292        if expected != actual {
293            return Err(ExecutorError::corruption(
294                ErrorOrigin::Index,
295                format!("index hash collision: {} ({})", E::PATH, field),
296            )
297            .into());
298        }
299    }
300
301    sink::record(MetricsEvent::UniqueViolation {
302        entity_path: E::PATH,
303    });
304
305    Err(ExecutorError::index_violation(E::PATH, index.fields).into())
306}
307
308/// Compute commit-time index operations for a single index.
309///
310/// Produces a minimal set of index updates:
311/// - `Some(bytes)` → insert/update index entry
312/// - `None`        → delete index entry
313///
314/// Correctly handles old/new key overlap and guarantees that
315/// apply-time mutations cannot fail except by invariant violation.
316#[allow(clippy::too_many_arguments)]
317fn build_commit_ops_for_index<E: EntityKind>(
318    commit_ops: &mut Vec<CommitIndexOp>,
319    index: &'static IndexModel,
320    old_key: Option<IndexKey>,
321    new_key: Option<IndexKey>,
322    old_entry: Option<IndexEntry<E>>,
323    new_entry: Option<IndexEntry<E>>,
324    old_entity_key: Option<E::Id>,
325    new_entity_key: Option<E::Id>,
326) -> Result<(), InternalError> {
327    let mut touched: BTreeMap<RawIndexKey, Option<IndexEntry<E>>> = BTreeMap::new();
328    let fields = index.fields.join(", ");
329
330    // ── Removal ────────────────────────────────
331
332    if let Some(old_key) = old_key {
333        let Some(old_entity_key) = old_entity_key else {
334            return Err(InternalError::new(
335                ErrorClass::Internal,
336                ErrorOrigin::Index,
337                "missing old entity key for index removal".to_string(),
338            ));
339        };
340
341        if let Some(mut entry) = old_entry {
342            entry.remove(old_entity_key);
343            let after = if entry.is_empty() { None } else { Some(entry) };
344            touched.insert(old_key.to_raw(), after);
345        } else {
346            // No existing index entry -> nothing to remove.
347            touched.insert(old_key.to_raw(), None);
348        }
349    }
350
351    // ── Insertion ──────────────────────────────
352
353    if let Some(new_key) = new_key {
354        let Some(new_entity_key) = new_entity_key else {
355            return Err(InternalError::new(
356                ErrorClass::Internal,
357                ErrorOrigin::Index,
358                "missing new entity key for index insertion".to_string(),
359            ));
360        };
361
362        let raw_key = new_key.to_raw();
363
364        // Start from:
365        //   1. result of removal (if same key)
366        //   2. existing entry loaded from store
367        //   3. brand new entry
368        let mut entry = if let Some(existing) = touched.remove(&raw_key) {
369            existing.unwrap_or_else(|| IndexEntry::new(new_entity_key))
370        } else if let Some(existing) = new_entry {
371            existing
372        } else {
373            IndexEntry::new(new_entity_key)
374        };
375
376        entry.insert(new_entity_key);
377        touched.insert(raw_key, Some(entry));
378    }
379
380    // ── Emit commit ops ────────────────────────
381
382    for (raw_key, entry) in touched {
383        let value = if let Some(entry) = entry {
384            let raw = RawIndexEntry::try_from(&entry).map_err(|err| match err {
385                IndexEntryEncodeError::TooManyKeys { keys } => InternalError::new(
386                    ErrorClass::Unsupported,
387                    ErrorOrigin::Index,
388                    format!(
389                        "index entry exceeds max keys: {} ({}) -> {} keys",
390                        E::PATH,
391                        fields,
392                        keys
393                    ),
394                ),
395                IndexEntryEncodeError::KeyEncoding(err) => InternalError::new(
396                    ErrorClass::Unsupported,
397                    ErrorOrigin::Index,
398                    format!(
399                        "index entry key encoding failed: {} ({}) -> {err}",
400                        E::PATH,
401                        fields
402                    ),
403                ),
404            })?;
405            Some(raw.into_bytes())
406        } else {
407            None
408        };
409
410        commit_ops.push(CommitIndexOp {
411            store: index.store.to_string(),
412            key: raw_key.as_bytes().to_vec(),
413            value,
414        });
415    }
416
417    Ok(())
418}