Skip to main content

icydb_core/db/index/
store.rs

1use crate::db::index::{
2    entry::{MAX_INDEX_ENTRY_BYTES, RawIndexEntry},
3    key::RawIndexKey,
4};
5use crate::traits::Storable;
6
7use canic_cdk::structures::{
8    BTreeMap, DefaultMemoryImpl, memory::VirtualMemory, storable::Bound as StorableBound,
9};
10use canic_utils::hash::Xxh3;
11use std::borrow::Cow;
12
13///
14/// IndexStore
15///
16/// Architectural Notes:
17///
18/// - Thin persistence wrapper over a stable BTreeMap.
19/// - RawIndexKey and RawIndexEntry are fully validated before insertion.
20/// - Fingerprints are non-authoritative diagnostic witnesses.
21/// - Fingerprints are always stored, but only verified in debug builds.
22/// - This layer does NOT enforce commit/transaction discipline.
23///   Higher layers are responsible for write coordination.
24/// - IndexStore intentionally does NOT implement Deref to avoid leaking
25///   internal storage representation (StoredIndexValue).
26///
27
28pub struct IndexStore {
29    map: BTreeMap<RawIndexKey, StoredIndexValue, VirtualMemory<DefaultMemoryImpl>>,
30    generation: u64,
31}
32
33impl IndexStore {
34    #[must_use]
35    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
36        Self {
37            map: BTreeMap::init(memory),
38            generation: 0,
39        }
40    }
41
42    /// Snapshot all index entry pairs (diagnostics only).
43    pub(crate) fn entries(&self) -> Vec<(RawIndexKey, RawIndexEntry)> {
44        self.map
45            .iter()
46            .map(|entry| (entry.key().clone(), entry.value().entry))
47            .collect()
48    }
49
50    pub(in crate::db) fn get(&self, key: &RawIndexKey) -> Option<RawIndexEntry> {
51        let value = self.map.get(key);
52
53        #[cfg(debug_assertions)]
54        if let Some(ref stored) = value {
55            Self::verify_if_debug(key, stored);
56        }
57
58        value.map(|stored| stored.entry)
59    }
60
61    pub fn len(&self) -> u64 {
62        self.map.len()
63    }
64
65    pub fn is_empty(&self) -> bool {
66        self.map.is_empty()
67    }
68
69    #[must_use]
70    pub(in crate::db) const fn generation(&self) -> u64 {
71        self.generation
72    }
73
74    pub(crate) fn insert(
75        &mut self,
76        key: RawIndexKey,
77        entry: RawIndexEntry,
78    ) -> Option<RawIndexEntry> {
79        let fingerprint = Self::entry_fingerprint(&key, &entry);
80
81        let stored = StoredIndexValue { entry, fingerprint };
82        let previous = self.map.insert(key, stored).map(|prev| prev.entry);
83        self.bump_generation();
84        previous
85    }
86
87    pub(crate) fn remove(&mut self, key: &RawIndexKey) -> Option<RawIndexEntry> {
88        let previous = self.map.remove(key).map(|prev| prev.entry);
89        self.bump_generation();
90        previous
91    }
92
93    pub fn clear(&mut self) {
94        self.map.clear();
95        self.bump_generation();
96    }
97
98    /// Sum of bytes used by all stored index entries.
99    pub fn memory_bytes(&self) -> u64 {
100        self.map
101            .iter()
102            .map(|entry| {
103                entry.key().as_bytes().len() as u64
104                    + entry.value().entry.len() as u64
105                    + u64::from(RawIndexFingerprint::STORED_SIZE)
106            })
107            .sum()
108    }
109
110    const fn bump_generation(&mut self) {
111        self.generation = self.generation.saturating_add(1);
112    }
113
114    fn entry_fingerprint(key: &RawIndexKey, entry: &RawIndexEntry) -> RawIndexFingerprint {
115        const VERSION: u8 = 1;
116
117        let mut hasher = Xxh3::with_seed(0);
118        hasher.update(&[VERSION]);
119        hasher.update(key.as_bytes());
120        hasher.update(entry.as_bytes());
121
122        RawIndexFingerprint(hasher.digest128().to_be_bytes())
123    }
124
125    #[cfg(debug_assertions)]
126    fn verify_if_debug(key: &RawIndexKey, stored: &StoredIndexValue) {
127        let expected = Self::entry_fingerprint(key, &stored.entry);
128
129        debug_assert!(
130            stored.fingerprint == expected,
131            "debug invariant violation: index fingerprint mismatch"
132        );
133    }
134}
135
136///
137/// StoredIndexValue
138///
139/// Raw entry plus non-authoritative diagnostic fingerprint.
140/// Encoded as: [RawIndexEntry bytes | 16-byte fingerprint]
141///
142
143#[derive(Clone, Debug)]
144struct StoredIndexValue {
145    entry: RawIndexEntry,
146    fingerprint: RawIndexFingerprint,
147}
148
149impl StoredIndexValue {
150    const STORED_SIZE: u32 = MAX_INDEX_ENTRY_BYTES + RawIndexFingerprint::STORED_SIZE;
151}
152
153impl Storable for StoredIndexValue {
154    fn to_bytes(&self) -> Cow<'_, [u8]> {
155        Cow::Owned(self.clone().into_bytes())
156    }
157
158    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
159        let bytes = bytes.as_ref();
160
161        let (entry_bytes, fingerprint_bytes) =
162            if bytes.len() < RawIndexFingerprint::STORED_SIZE as usize {
163                (bytes, &[][..])
164            } else {
165                bytes.split_at(bytes.len() - RawIndexFingerprint::STORED_SIZE as usize)
166            };
167
168        let mut out = [0u8; 16];
169        if fingerprint_bytes.len() == out.len() {
170            out.copy_from_slice(fingerprint_bytes);
171        }
172
173        Self {
174            entry: RawIndexEntry::from_bytes(Cow::Borrowed(entry_bytes)),
175            fingerprint: RawIndexFingerprint(out),
176        }
177    }
178
179    fn into_bytes(self) -> Vec<u8> {
180        let mut bytes = self.entry.into_bytes();
181        bytes.extend_from_slice(&self.fingerprint.0);
182        bytes
183    }
184
185    const BOUND: StorableBound = StorableBound::Bounded {
186        max_size: Self::STORED_SIZE,
187        is_fixed_size: false,
188    };
189}
190
191///
192/// RawIndexFingerprint
193///
194
195#[derive(Clone, Copy, Debug, Eq, PartialEq)]
196pub(crate) struct RawIndexFingerprint([u8; 16]);
197
198impl RawIndexFingerprint {
199    pub(crate) const STORED_SIZE: u32 = 16;
200}
201
202impl Storable for RawIndexFingerprint {
203    fn to_bytes(&self) -> Cow<'_, [u8]> {
204        Cow::Borrowed(&self.0)
205    }
206
207    fn from_bytes(bytes: Cow<'_, [u8]>) -> Self {
208        let mut out = [0u8; 16];
209        if bytes.len() == out.len() {
210            out.copy_from_slice(bytes.as_ref());
211        }
212        Self(out)
213    }
214
215    fn into_bytes(self) -> Vec<u8> {
216        self.0.to_vec()
217    }
218
219    const BOUND: StorableBound = StorableBound::Bounded {
220        max_size: Self::STORED_SIZE,
221        is_fixed_size: true,
222    };
223}
224
225use crate::{
226    db::{
227        data::DataKey,
228        direction::Direction,
229        index::{
230            IndexKey, continuation_advanced, envelope_is_empty,
231            predicate::{IndexPredicateExecution, eval_index_execution_on_decoded_key},
232            range::anchor_within_envelope,
233            resume_bounds_from_refs,
234        },
235    },
236    error::InternalError,
237    model::index::IndexModel,
238    traits::EntityKind,
239};
240use std::ops::Bound;
241
242impl IndexStore {
243    pub(in crate::db) fn resolve_data_values_in_raw_range_limited<E: EntityKind>(
244        &self,
245        index: &IndexModel,
246        bounds: (&Bound<RawIndexKey>, &Bound<RawIndexKey>),
247        continuation_start_exclusive: Option<&RawIndexKey>,
248        direction: Direction,
249        limit: usize,
250        index_predicate_execution: Option<IndexPredicateExecution<'_>>,
251    ) -> Result<Vec<DataKey>, InternalError> {
252        if limit == 0 {
253            return Ok(Vec::new());
254        }
255
256        Self::ensure_anchor_within_envelope(direction, continuation_start_exclusive, bounds)?;
257
258        let (start_raw, end_raw) = match continuation_start_exclusive {
259            Some(anchor) => resume_bounds_from_refs(direction, bounds.0, bounds.1, anchor),
260            None => (bounds.0.clone(), bounds.1.clone()),
261        };
262
263        if envelope_is_empty(&start_raw, &end_raw) {
264            return Ok(Vec::new());
265        }
266
267        let mut out = Vec::new();
268
269        match direction {
270            Direction::Asc => {
271                for entry in self.map.range((start_raw, end_raw)) {
272                    let raw_key = entry.key();
273                    let value = entry.value();
274
275                    Self::ensure_continuation_advanced(
276                        direction,
277                        raw_key,
278                        continuation_start_exclusive,
279                    )?;
280
281                    if Self::decode_index_entry_and_push::<E>(
282                        index,
283                        raw_key,
284                        &value,
285                        &mut out,
286                        Some(limit),
287                        "range resolve",
288                        index_predicate_execution,
289                    )? {
290                        return Ok(out);
291                    }
292                }
293            }
294            Direction::Desc => {
295                for entry in self.map.range((start_raw, end_raw)).rev() {
296                    let raw_key = entry.key();
297                    let value = entry.value();
298
299                    Self::ensure_continuation_advanced(
300                        direction,
301                        raw_key,
302                        continuation_start_exclusive,
303                    )?;
304
305                    if Self::decode_index_entry_and_push::<E>(
306                        index,
307                        raw_key,
308                        &value,
309                        &mut out,
310                        Some(limit),
311                        "range resolve",
312                        index_predicate_execution,
313                    )? {
314                        return Ok(out);
315                    }
316                }
317            }
318        }
319
320        Ok(out)
321    }
322
323    // Validate strict continuation advancement when an anchor is present.
324    //
325    // IMPORTANT CROSS-LAYER CONTRACT:
326    // - Planner/cursor-spine validation ensures envelope/signature compatibility.
327    // - This store-layer guard independently enforces strict monotonic advancement.
328    // - Keep both layers explicit; do not collapse this into planner-only checks.
329    fn ensure_continuation_advanced(
330        direction: Direction,
331        candidate: &RawIndexKey,
332        anchor: Option<&RawIndexKey>,
333    ) -> Result<(), InternalError> {
334        if let Some(anchor) = anchor
335            && !continuation_advanced(direction, candidate, anchor)
336        {
337            return Err(InternalError::index_invariant(
338                "index-range continuation scan did not advance beyond the anchor",
339            ));
340        }
341
342        Ok(())
343    }
344
345    // Validate that continuation anchor is contained by the original range envelope.
346    //
347    // Keep this guard in the store layer even though planner/cursor validation already
348    // checks containment: this is a defensive contract check against cross-layer misuse.
349    fn ensure_anchor_within_envelope(
350        direction: Direction,
351        anchor: Option<&RawIndexKey>,
352        bounds: (&Bound<RawIndexKey>, &Bound<RawIndexKey>),
353    ) -> Result<(), InternalError> {
354        if let Some(anchor) = anchor
355            && !anchor_within_envelope(direction, anchor, bounds.0, bounds.1)
356        {
357            return Err(InternalError::index_invariant(
358                "index-range continuation anchor is outside the requested range envelope",
359            ));
360        }
361
362        Ok(())
363    }
364
365    fn decode_index_entry_and_push<E: EntityKind>(
366        index: &IndexModel,
367        raw_key: &RawIndexKey,
368        value: &StoredIndexValue,
369        out: &mut Vec<DataKey>,
370        limit: Option<usize>,
371        context: &'static str,
372        index_predicate_execution: Option<IndexPredicateExecution<'_>>,
373    ) -> Result<bool, InternalError> {
374        #[cfg(debug_assertions)]
375        Self::verify_if_debug(raw_key, value);
376
377        let decoded_key = IndexKey::try_from_raw(raw_key).map_err(|err| {
378            InternalError::index_corruption(format!("index key corrupted during {context}: {err}"))
379        })?;
380
381        if let Some(execution) = index_predicate_execution
382            && !eval_index_execution_on_decoded_key(&decoded_key, execution)?
383        {
384            return Ok(false);
385        }
386
387        let storage_keys = value
388            .entry
389            .decode_keys()
390            .map_err(|err| InternalError::index_corruption(err.to_string()))?;
391
392        if index.unique && storage_keys.len() != 1 {
393            return Err(InternalError::index_corruption(
394                "unique index entry contains an unexpected number of keys",
395            ));
396        }
397
398        for storage_key in storage_keys {
399            out.push(DataKey::from_key::<E>(storage_key));
400
401            if let Some(limit) = limit
402                && out.len() == limit
403            {
404                return Ok(true);
405            }
406        }
407
408        Ok(false)
409    }
410}
411
412//
413// TESTS
414//
415
416#[cfg(test)]
417mod tests {
418    use crate::{
419        db::{direction::Direction, index::store::RawIndexKey},
420        error::{ErrorClass, ErrorOrigin},
421        traits::Storable,
422    };
423    use std::{borrow::Cow, ops::Bound};
424
425    use super::IndexStore;
426
427    fn raw_key(byte: u8) -> RawIndexKey {
428        <RawIndexKey as Storable>::from_bytes(Cow::Owned(vec![byte]))
429    }
430
431    #[test]
432    fn continuation_advancement_guard_rejects_non_advanced_candidate_asc() {
433        let anchor = raw_key(0x10);
434        let candidate = raw_key(0x10);
435
436        let err =
437            IndexStore::ensure_continuation_advanced(Direction::Asc, &candidate, Some(&anchor))
438                .expect_err("ASC continuation candidate equal to anchor must be rejected");
439
440        assert_eq!(err.class, ErrorClass::InvariantViolation);
441        assert_eq!(err.origin, ErrorOrigin::Index);
442    }
443
444    #[test]
445    fn continuation_advancement_guard_rejects_non_advanced_candidate_desc() {
446        let anchor = raw_key(0x10);
447        let candidate = raw_key(0x11);
448
449        let err =
450            IndexStore::ensure_continuation_advanced(Direction::Desc, &candidate, Some(&anchor))
451                .expect_err(
452                    "DESC continuation candidate not strictly after anchor must be rejected",
453                );
454
455        assert_eq!(err.class, ErrorClass::InvariantViolation);
456        assert_eq!(err.origin, ErrorOrigin::Index);
457    }
458
459    #[test]
460    fn anchor_containment_guard_rejects_out_of_envelope_anchor() {
461        let lower = Bound::Included(raw_key(0x10));
462        let upper = Bound::Excluded(raw_key(0x20));
463        let anchor = raw_key(0x20);
464
465        let err = IndexStore::ensure_anchor_within_envelope(
466            Direction::Asc,
467            Some(&anchor),
468            (&lower, &upper),
469        )
470        .expect_err("out-of-envelope continuation anchor must be rejected");
471
472        assert_eq!(err.class, ErrorClass::InvariantViolation);
473        assert_eq!(err.origin, ErrorOrigin::Index);
474    }
475}