Skip to main content

icydb_core/db/index/
store.rs

1//! Module: index::store
2//! Responsibility: journaled-or-heap index-entry storage behind the index-store boundary.
3//! Does not own: range-scan resolution, continuation semantics, or predicate execution.
4//! Boundary: scan/executor layers depend on this storage boundary.
5
6use crate::db::{
7    direction::Direction,
8    index::{
9        IndexEntryValue, IndexId, IndexKeyKind, cardinality::IndexPrefixCardinality,
10        key::RawIndexStoreKey,
11    },
12    ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
13};
14
15use candid::CandidType;
16use ic_memory::stable_structures::{
17    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
18};
19use serde::Deserialize;
20#[cfg(any(test, feature = "diagnostics"))]
21use std::cell::Cell;
22use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
23use std::ops::Bound;
24
25#[cfg(test)]
26thread_local! {
27    static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
28}
29
30#[cfg(feature = "diagnostics")]
31thread_local! {
32    static INDEX_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
33    static INDEX_STORE_RANGE_SCAN_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
34    static INDEX_STORE_ENTRY_READ_COUNT: Cell<u64> = const { Cell::new(0) };
35}
36
37#[cfg(feature = "diagnostics")]
38fn record_index_store_get_call() {
39    INDEX_STORE_GET_CALL_COUNT.with(|count| {
40        count.set(count.get().saturating_add(1));
41    });
42}
43
44#[cfg(feature = "diagnostics")]
45fn record_index_store_range_scan_call() {
46    INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(|count| {
47        count.set(count.get().saturating_add(1));
48    });
49}
50
51#[cfg(feature = "diagnostics")]
52fn record_index_store_entry_read() {
53    INDEX_STORE_ENTRY_READ_COUNT.with(|count| {
54        count.set(count.get().saturating_add(1));
55    });
56}
57
58fn visit_index_store_entry<E>(
59    key: &RawIndexStoreKey,
60    value: &IndexEntryValue,
61    visit: &mut impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
62) -> Result<bool, E> {
63    #[cfg(feature = "diagnostics")]
64    record_index_store_entry_read();
65
66    visit(key, value)
67}
68
69#[cfg(test)]
70fn record_journaled_snapshot_call() {
71    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
72        count.set(count.get().saturating_add(1));
73    });
74}
75
76#[cfg(test)]
77fn reset_journaled_snapshot_call_count_for_tests() {
78    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
79}
80
81#[cfg(test)]
82fn journaled_snapshot_call_count_for_tests() -> u64 {
83    JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
84}
85
86//
87// IndexState
88//
89// Explicit lifecycle visibility state for one index store.
90// Visibility matters because planner-visible indexes must already be complete:
91// the index contents are fully built and query-visible for reads.
92//
93#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
94pub enum IndexState {
95    Building,
96    #[default]
97    Ready,
98    Dropping,
99}
100
101impl IndexState {
102    /// Return the stable lowercase text label for this lifecycle state.
103    #[must_use]
104    pub const fn as_str(self) -> &'static str {
105        match self {
106            Self::Building => "building",
107            Self::Ready => "ready",
108            Self::Dropping => "dropping",
109        }
110    }
111}
112
113///
114/// IndexStore
115///
116/// Thin persistence wrapper over one journaled or heap BTreeMap.
117///
118/// Invariant: callers provide already-validated `RawIndexStoreKey`/`IndexEntryValue`.
119///
120
121pub struct IndexStore {
122    pub(super) backend: IndexStoreBackend,
123    generation: u64,
124    state: IndexState,
125    prefix_cardinality: IndexPrefixCardinality,
126}
127
128pub(super) enum IndexStoreBackend {
129    Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
130    Journaled {
131        canonical:
132            StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
133        live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
134        tombstones: BTreeSet<RawIndexStoreKey>,
135    },
136}
137
138/// Control-flow result for index-store traversal visitors.
139#[derive(Clone, Copy, Debug, Eq, PartialEq)]
140pub(in crate::db) enum IndexStoreVisit {
141    Continue,
142    Stop,
143}
144
145impl IndexStoreVisit {
146    const fn should_stop(self) -> bool {
147        matches!(self, Self::Stop)
148    }
149}
150
151impl IndexStore {
152    /// Initialize a volatile heap-backed index store.
153    #[must_use]
154    pub const fn init_heap() -> Self {
155        Self {
156            backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
157            generation: 0,
158            state: IndexState::Ready,
159            prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
160        }
161    }
162
163    /// Initialize a journaled cached-stable index store.
164    ///
165    /// Normal writes update only the live materialized projection. The
166    /// canonical stable index is updated by future fold/rebuild paths.
167    #[must_use]
168    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
169        let mut store = Self {
170            backend: IndexStoreBackend::Journaled {
171                canonical: StableBTreeMap::init(memory),
172                live: HeapBTreeMap::new(),
173                tombstones: BTreeSet::new(),
174            },
175            generation: 0,
176            state: IndexState::Ready,
177            prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
178        };
179        store.rebuild_prefix_cardinality_from_entries(Some(0));
180        store
181    }
182
183    /// Visit all index entries in canonical store order without exposing the
184    /// backing stable-map iterator.
185    pub(in crate::db) fn visit_entries<E>(
186        &self,
187        mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
188    ) -> Result<(), E> {
189        match &self.backend {
190            IndexStoreBackend::Heap(map) => {
191                for (key, value) in map {
192                    #[cfg(feature = "diagnostics")]
193                    record_index_store_entry_read();
194
195                    if visitor(key, value)?.should_stop() {
196                        return Ok(());
197                    }
198                }
199            }
200            IndexStoreBackend::Journaled {
201                canonical: _,
202                live: _,
203                tombstones: _,
204            } => self.visit_journaled_entries_in_range(
205                (&Bound::Unbounded, &Bound::Unbounded),
206                Direction::Asc,
207                |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
208            )?,
209        }
210
211        Ok(())
212    }
213
214    pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
215        #[cfg(feature = "diagnostics")]
216        record_index_store_get_call();
217
218        match &self.backend {
219            IndexStoreBackend::Heap(map) => map.get(key).cloned(),
220            IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
221        }
222    }
223
224    pub fn len(&self) -> u64 {
225        match &self.backend {
226            IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
227            IndexStoreBackend::Journaled { .. } => {
228                let mut count = 0_u64;
229                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
230                    count = count.saturating_add(1);
231                    Ok(IndexStoreVisit::Continue)
232                });
233                count
234            }
235        }
236    }
237
238    pub fn is_empty(&self) -> bool {
239        match &self.backend {
240            IndexStoreBackend::Heap(map) => map.is_empty(),
241            IndexStoreBackend::Journaled { .. } => {
242                let mut empty = true;
243                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
244                    empty = false;
245                    Ok(IndexStoreVisit::Stop)
246                });
247                empty
248            }
249        }
250    }
251
252    #[must_use]
253    pub(in crate::db) const fn generation(&self) -> u64 {
254        self.generation
255    }
256
257    /// Return the explicit lifecycle state for this index store.
258    #[must_use]
259    pub(in crate::db) const fn state(&self) -> IndexState {
260        self.state
261    }
262
263    /// Return an exact user-index prefix count when the index metadata is
264    /// synchronized with the caller's authoritative row-store generation.
265    #[must_use]
266    pub(in crate::db) fn exact_prefix_cardinality(
267        &self,
268        data_generation: u64,
269        key_kind: IndexKeyKind,
270        index_id: IndexId,
271        components: &[Vec<u8>],
272    ) -> Option<u64> {
273        self.prefix_cardinality
274            .exact_count(data_generation, key_kind, index_id, components)
275    }
276
277    /// Mark prefix-cardinality metadata synchronized with the authoritative
278    /// row-store generation after a committed row/index transition.
279    pub(in crate::db) const fn mark_prefix_cardinality_data_generation(&mut self, generation: u64) {
280        self.prefix_cardinality.mark_synchronized(generation);
281    }
282
283    /// Mark this index store as in-progress and therefore ineligible for
284    /// planner visibility until a full authoritative rebuild ends.
285    pub(in crate::db) const fn mark_building(&mut self) {
286        self.state = IndexState::Building;
287    }
288
289    /// Mark this index store as fully built and planner-visible again.
290    pub(in crate::db) const fn mark_ready(&mut self) {
291        self.state = IndexState::Ready;
292    }
293
294    /// Mark this index store as dropping and therefore not planner-visible.
295    pub(in crate::db) const fn mark_dropping(&mut self) {
296        self.state = IndexState::Dropping;
297    }
298
299    pub(crate) fn insert(
300        &mut self,
301        key: RawIndexStoreKey,
302        entry: IndexEntryValue,
303    ) -> Option<IndexEntryValue> {
304        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
305            self.get(&key)
306        } else {
307            None
308        };
309        let cardinality_key = key.clone();
310        let previous = match &mut self.backend {
311            IndexStoreBackend::Heap(map) => map.insert(key, entry.clone()),
312            IndexStoreBackend::Journaled {
313                live, tombstones, ..
314            } => {
315                tombstones.remove(&key);
316                live.insert(key, entry.clone());
317                previous_journaled
318            }
319        };
320        self.prefix_cardinality
321            .apply_insert(&cardinality_key, previous.as_ref(), &entry);
322        self.bump_generation();
323        previous
324    }
325
326    pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
327        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
328            self.get(key)
329        } else {
330            None
331        };
332        let previous = match &mut self.backend {
333            IndexStoreBackend::Heap(map) => map.remove(key),
334            IndexStoreBackend::Journaled {
335                live, tombstones, ..
336            } => {
337                live.remove(key);
338                tombstones.insert(key.clone());
339                previous_journaled
340            }
341        };
342        self.prefix_cardinality.apply_remove(key, previous.as_ref());
343        self.bump_generation();
344        previous
345    }
346
347    pub fn clear(&mut self) {
348        match &mut self.backend {
349            IndexStoreBackend::Heap(map) => map.clear(),
350            IndexStoreBackend::Journaled {
351                canonical,
352                live,
353                tombstones,
354            } => {
355                live.clear();
356                tombstones.clear();
357                for entry in canonical.iter() {
358                    tombstones.insert(entry.key().clone());
359                }
360            }
361        }
362        self.prefix_cardinality.clear_unsynchronized();
363        self.bump_generation();
364    }
365
366    /// Fold the current journaled materialized index view into the canonical
367    /// stable base and clear volatile projection state.
368    pub(in crate::db) fn fold_journaled_materialized_view(
369        &mut self,
370    ) -> Result<(), crate::error::InternalError> {
371        let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
372        let IndexStoreBackend::Journaled {
373            canonical,
374            live,
375            tombstones,
376        } = &mut self.backend
377        else {
378            return Err(crate::error::InternalError::store_invariant());
379        };
380
381        canonical.clear_new();
382        for (key, value) in entries {
383            canonical.insert(key, value);
384        }
385        live.clear();
386        tombstones.clear();
387        let data_generation = self.prefix_cardinality.synchronized_generation();
388        self.rebuild_prefix_cardinality_from_entries(data_generation);
389        self.bump_generation();
390
391        Ok(())
392    }
393
394    /// Sum of bytes used by all stored index entries.
395    pub fn memory_bytes(&self) -> u64 {
396        let mut bytes = 0u64;
397        let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
398            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
399            Ok(IndexStoreVisit::Continue)
400        });
401        bytes
402    }
403
404    /// Return the monotonic perf-only count of index-entry fetches seen by this process.
405    #[cfg(feature = "diagnostics")]
406    pub(in crate::db) fn current_get_call_count() -> u64 {
407        INDEX_STORE_GET_CALL_COUNT.with(Cell::get)
408    }
409
410    /// Return the monotonic perf-only count of index range traversal probes seen by this process.
411    #[cfg(feature = "diagnostics")]
412    pub(in crate::db) fn current_range_scan_call_count() -> u64 {
413        INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(Cell::get)
414    }
415
416    /// Return the monotonic perf-only count of index entries yielded by traversal.
417    #[cfg(feature = "diagnostics")]
418    pub(in crate::db) fn current_entry_read_count() -> u64 {
419        INDEX_STORE_ENTRY_READ_COUNT.with(Cell::get)
420    }
421
422    #[cfg(feature = "diagnostics")]
423    pub(in crate::db::index) fn record_range_scan_call() {
424        record_index_store_range_scan_call();
425    }
426
427    const fn bump_generation(&mut self) {
428        self.generation = self.generation.saturating_add(1);
429    }
430
431    fn rebuild_prefix_cardinality_from_entries(&mut self, data_generation: Option<u64>) {
432        self.prefix_cardinality.clear_unsynchronized();
433        let entries = Self::entries_snapshot_for_cardinality(&self.backend);
434        for (key, value) in &entries {
435            self.prefix_cardinality.apply_insert(key, None, value);
436        }
437        if let Some(data_generation) = data_generation {
438            self.prefix_cardinality.mark_synchronized(data_generation);
439        }
440    }
441
442    fn entries_snapshot_for_cardinality(
443        backend: &IndexStoreBackend,
444    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
445        match backend {
446            IndexStoreBackend::Heap(map) => map.clone(),
447            IndexStoreBackend::Journaled { .. } => {
448                Self::journaled_entries_snapshot_for_fold(backend)
449            }
450        }
451    }
452
453    #[cfg(test)]
454    #[must_use]
455    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
456        match &self.backend {
457            IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
458            IndexStoreBackend::Heap(_) => 0,
459        }
460    }
461
462    fn journaled_get(
463        backend: &IndexStoreBackend,
464        key: &RawIndexStoreKey,
465    ) -> Option<IndexEntryValue> {
466        let IndexStoreBackend::Journaled {
467            canonical,
468            live,
469            tombstones,
470        } = backend
471        else {
472            return None;
473        };
474
475        if tombstones.contains(key) {
476            return None;
477        }
478        live.get(key).cloned().or_else(|| canonical.get(key))
479    }
480
481    pub(super) fn journaled_entries_snapshot_for_fold(
482        backend: &IndexStoreBackend,
483    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
484        #[cfg(test)]
485        record_journaled_snapshot_call();
486
487        let IndexStoreBackend::Journaled {
488            canonical,
489            live,
490            tombstones,
491        } = backend
492        else {
493            return HeapBTreeMap::new();
494        };
495
496        let mut entries = HeapBTreeMap::new();
497        for entry in canonical.iter() {
498            let key = entry.key().clone();
499            if !tombstones.contains(&key) {
500                entries.insert(key, entry.value());
501            }
502        }
503        for (key, value) in live {
504            if !tombstones.contains(key) {
505                entries.insert(key.clone(), value.clone());
506            }
507        }
508
509        entries
510    }
511
512    pub(super) fn visit_journaled_entries_in_range<E>(
513        &self,
514        bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
515        direction: Direction,
516        mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
517    ) -> Result<(), E> {
518        let IndexStoreBackend::Journaled {
519            canonical,
520            live,
521            tombstones,
522        } = &self.backend
523        else {
524            return Ok(());
525        };
526
527        let lower = bounds.0.clone();
528        let upper = bounds.1.clone();
529        match direction {
530            Direction::Asc if canonical.is_empty() => {
531                for (key, value) in live.range((lower, upper)) {
532                    if visit_index_store_entry(key, value, &mut visit)? {
533                        return Ok(());
534                    }
535                }
536            }
537            Direction::Desc if canonical.is_empty() => {
538                for (key, value) in live.range((lower, upper)).rev() {
539                    if visit_index_store_entry(key, value, &mut visit)? {
540                        return Ok(());
541                    }
542                }
543            }
544            Direction::Asc if live.is_empty() && tombstones.is_empty() => {
545                for entry in canonical.range((lower, upper)) {
546                    if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
547                        return Ok(());
548                    }
549                }
550            }
551            Direction::Desc if live.is_empty() && tombstones.is_empty() => {
552                for entry in canonical.range((lower, upper)).rev() {
553                    if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
554                        return Ok(());
555                    }
556                }
557            }
558            Direction::Asc => {
559                visit_ordered_overlay(
560                    canonical.range((lower.clone(), upper.clone())),
561                    live.range((lower, upper)),
562                    direction,
563                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
564                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
565                    |live_entry| !tombstones.contains(live_entry.0),
566                    |entry| {
567                        let should_stop = match entry {
568                            OrderedOverlayEntry::Canonical(canonical_entry) => {
569                                visit_index_store_entry(
570                                    canonical_entry.key(),
571                                    &canonical_entry.value(),
572                                    &mut visit,
573                                )?
574                            }
575                            OrderedOverlayEntry::Live((key, value)) => {
576                                visit_index_store_entry(key, value, &mut visit)?
577                            }
578                        };
579                        Ok(if should_stop {
580                            OrderedOverlayVisit::Stop
581                        } else {
582                            OrderedOverlayVisit::Continue
583                        })
584                    },
585                )?;
586            }
587            Direction::Desc => {
588                visit_ordered_overlay(
589                    canonical.range((lower.clone(), upper.clone())).rev(),
590                    live.range((lower, upper)).rev(),
591                    direction,
592                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
593                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
594                    |live_entry| !tombstones.contains(live_entry.0),
595                    |entry| {
596                        let should_stop = match entry {
597                            OrderedOverlayEntry::Canonical(canonical_entry) => {
598                                visit_index_store_entry(
599                                    canonical_entry.key(),
600                                    &canonical_entry.value(),
601                                    &mut visit,
602                                )?
603                            }
604                            OrderedOverlayEntry::Live((key, value)) => {
605                                visit_index_store_entry(key, value, &mut visit)?
606                            }
607                        };
608                        Ok(if should_stop {
609                            OrderedOverlayVisit::Stop
610                        } else {
611                            OrderedOverlayVisit::Continue
612                        })
613                    },
614                )?;
615            }
616        }
617
618        Ok(())
619    }
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625    use crate::{
626        db::{
627            direction::Direction,
628            index::{IndexId, IndexKey, IndexKeyKind},
629            key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
630        },
631        testing::test_memory,
632        traits::Storable,
633        types::EntityTag,
634    };
635    use std::{borrow::Cow, convert::Infallible};
636
637    fn raw_key(value: u8) -> RawIndexStoreKey {
638        <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
639    }
640
641    fn indexed_raw_key(
642        index_id: &IndexId,
643        components: Vec<Vec<u8>>,
644        primary_key: u64,
645    ) -> RawIndexStoreKey {
646        indexed_raw_key_with_kind(index_id, IndexKeyKind::User, components, primary_key)
647    }
648
649    fn indexed_raw_key_with_kind(
650        index_id: &IndexId,
651        key_kind: IndexKeyKind,
652        components: Vec<Vec<u8>>,
653        primary_key: u64,
654    ) -> RawIndexStoreKey {
655        IndexKey::new_from_components_with_primary_key_value(
656            index_id,
657            key_kind,
658            components.as_slice(),
659            &PrimaryKeyValue::from(PrimaryKeyComponent::Nat64(primary_key)),
660        )
661        .to_raw()
662    }
663
664    fn malformed_index_entry_value() -> IndexEntryValue {
665        <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![0xFF]))
666    }
667
668    fn missing_index_entry_value() -> IndexEntryValue {
669        <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![1]))
670    }
671
672    #[test]
673    fn index_prefix_cardinality_requires_explicit_data_generation_sync() {
674        let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
675        let collection = b"collection-a".to_vec();
676        let draft = b"Draft".to_vec();
677        let review = b"Review".to_vec();
678        let mut store = IndexStore::init_heap();
679
680        store.insert(
681            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
682            IndexEntryValue::presence(),
683        );
684        store.insert(
685            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2),
686            IndexEntryValue::presence(),
687        );
688        store.insert(
689            indexed_raw_key(&index_id, vec![collection.clone(), review.clone()], 3),
690            IndexEntryValue::presence(),
691        );
692
693        assert_eq!(
694            store.exact_prefix_cardinality(
695                0,
696                IndexKeyKind::User,
697                index_id,
698                std::slice::from_ref(&collection),
699            ),
700            None,
701            "raw index mutations must not be trusted until row generation sync is stamped",
702        );
703
704        store.mark_prefix_cardinality_data_generation(7);
705
706        assert_eq!(
707            store.exact_prefix_cardinality(
708                7,
709                IndexKeyKind::User,
710                index_id,
711                std::slice::from_ref(&collection),
712            ),
713            Some(3),
714        );
715        assert_eq!(
716            store.exact_prefix_cardinality(
717                7,
718                IndexKeyKind::User,
719                index_id,
720                &[collection.clone(), draft],
721            ),
722            Some(2),
723        );
724        assert_eq!(
725            store.exact_prefix_cardinality(8, IndexKeyKind::User, index_id, &[collection, review],),
726            None,
727            "row generation drift should force the caller to use the existing-row fallback",
728        );
729    }
730
731    #[test]
732    fn index_prefix_cardinality_ignores_system_index_mutations() {
733        let user_index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
734        let system_index_id = IndexId::new(EntityTag::new(0xCA7D), 2);
735        let collection = b"collection-a".to_vec();
736        let draft = b"Draft".to_vec();
737        let system_component = b"reverse-edge".to_vec();
738        let mut store = IndexStore::init_heap();
739
740        store.insert(
741            indexed_raw_key(&user_index_id, vec![collection.clone(), draft.clone()], 1),
742            IndexEntryValue::presence(),
743        );
744        store.mark_prefix_cardinality_data_generation(7);
745
746        assert_eq!(
747            store.exact_prefix_cardinality(
748                7,
749                IndexKeyKind::User,
750                user_index_id,
751                &[collection.clone(), draft.clone()],
752            ),
753            Some(1),
754        );
755
756        let system_key = indexed_raw_key_with_kind(
757            &system_index_id,
758            IndexKeyKind::System,
759            vec![system_component],
760            1,
761        );
762        store.insert(system_key.clone(), IndexEntryValue::presence());
763        assert_eq!(
764            store.exact_prefix_cardinality(
765                7,
766                IndexKeyKind::User,
767                user_index_id,
768                &[collection.clone(), draft.clone()],
769            ),
770            Some(1),
771            "system index writes must not invalidate synchronized user-prefix cardinality",
772        );
773
774        store.remove(&system_key);
775        assert_eq!(
776            store.exact_prefix_cardinality(
777                7,
778                IndexKeyKind::User,
779                user_index_id,
780                &[collection.clone(), draft.clone()],
781            ),
782            Some(1),
783            "system index removals must not invalidate synchronized user-prefix cardinality",
784        );
785
786        let malformed_system_key = indexed_raw_key_with_kind(
787            &system_index_id,
788            IndexKeyKind::System,
789            vec![b"malformed-reverse-edge".to_vec()],
790            2,
791        );
792        store.insert(malformed_system_key.clone(), malformed_index_entry_value());
793        assert_eq!(
794            store.exact_prefix_cardinality(
795                7,
796                IndexKeyKind::User,
797                user_index_id,
798                &[collection.clone(), draft.clone()],
799            ),
800            Some(1),
801            "malformed system index payloads must not invalidate user-prefix cardinality",
802        );
803
804        store.remove(&malformed_system_key);
805        assert_eq!(
806            store.exact_prefix_cardinality(
807                7,
808                IndexKeyKind::User,
809                user_index_id,
810                &[collection.clone(), draft],
811            ),
812            Some(1),
813            "malformed system index removals must not invalidate user-prefix cardinality",
814        );
815
816        let review = b"Review".to_vec();
817        store.insert(
818            indexed_raw_key(&user_index_id, vec![collection.clone(), review.clone()], 2),
819            IndexEntryValue::presence(),
820        );
821        assert_eq!(
822            store.exact_prefix_cardinality(
823                7,
824                IndexKeyKind::User,
825                user_index_id,
826                &[collection, review]
827            ),
828            None,
829            "user-prefix count changes must still require a fresh row-generation stamp",
830        );
831    }
832
833    #[test]
834    fn index_prefix_cardinality_ignores_missing_user_index_mutations() {
835        let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
836        let collection = b"collection-a".to_vec();
837        let draft = b"Draft".to_vec();
838        let mut store = IndexStore::init_heap();
839
840        store.insert(
841            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
842            IndexEntryValue::presence(),
843        );
844        store.mark_prefix_cardinality_data_generation(7);
845
846        let stale_key = indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2);
847        store.insert(stale_key.clone(), missing_index_entry_value());
848        assert_eq!(
849            store.exact_prefix_cardinality(
850                7,
851                IndexKeyKind::User,
852                index_id,
853                &[collection.clone(), draft.clone()],
854            ),
855            Some(1),
856            "missing user index entries must not affect synchronized prefix cardinality",
857        );
858
859        store.remove(&stale_key);
860        assert_eq!(
861            store.exact_prefix_cardinality(7, IndexKeyKind::User, index_id, &[collection, draft],),
862            Some(1),
863            "missing user index removals must not affect synchronized prefix cardinality",
864        );
865    }
866
867    #[cfg(feature = "diagnostics")]
868    #[test]
869    fn index_store_diagnostic_counters_record_gets_range_scans_and_entry_reads() {
870        let mut store = IndexStore::init_heap();
871        store.insert(raw_key(7), IndexEntryValue::presence());
872        store.insert(raw_key(9), IndexEntryValue::presence());
873
874        let gets_before = IndexStore::current_get_call_count();
875        assert_eq!(store.get(&raw_key(7)), Some(IndexEntryValue::presence()));
876        assert_eq!(store.get(&raw_key(8)), None);
877
878        assert_eq!(
879            IndexStore::current_get_call_count().saturating_sub(gets_before),
880            2,
881            "diagnostic index-store get counter should count both hit and miss reads",
882        );
883
884        let range_scans_before = IndexStore::current_range_scan_call_count();
885        let lower = Bound::Included(raw_key(7));
886        let upper = Bound::Included(raw_key(9));
887        store
888            .visit_raw_entries_in_range((&lower, &upper), Direction::Asc, |_key, _entry| Ok(false))
889            .expect("raw index range visit should succeed");
890
891        assert_eq!(
892            IndexStore::current_range_scan_call_count().saturating_sub(range_scans_before),
893            1,
894            "diagnostic index-store range-scan counter should count one range traversal probe",
895        );
896
897        let entries_before = IndexStore::current_entry_read_count();
898        store
899            .visit_entries(|_key, _entry| Ok::<_, Infallible>(IndexStoreVisit::Continue))
900            .expect("index entry visit should succeed");
901
902        assert_eq!(
903            IndexStore::current_entry_read_count().saturating_sub(entries_before),
904            2,
905            "diagnostic index-store entry counter should count yielded traversal entries",
906        );
907    }
908
909    #[test]
910    fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
911        let mut store = IndexStore::init_journaled(test_memory(93));
912        for value in [1_u8, 3, 5] {
913            store.insert(raw_key(value), IndexEntryValue::presence());
914        }
915        store
916            .fold_journaled_materialized_view()
917            .expect("canonical index seed should fold");
918
919        store.insert(raw_key(0), IndexEntryValue::presence());
920        store.insert(raw_key(4), IndexEntryValue::presence());
921        store.insert(raw_key(5), IndexEntryValue::presence());
922        store.remove(&raw_key(1));
923
924        let lower = Bound::Included(raw_key(0));
925        let upper = Bound::Included(raw_key(5));
926
927        reset_journaled_snapshot_call_count_for_tests();
928        let mut asc = Vec::new();
929        store
930            .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
931                asc.push(key.as_bytes()[0]);
932                Ok::<_, Infallible>(asc.len() == 2)
933            })
934            .expect("asc journaled index range traversal should succeed");
935        assert_eq!(asc, vec![0, 3]);
936        assert_eq!(
937            journaled_snapshot_call_count_for_tests(),
938            0,
939            "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
940        );
941
942        reset_journaled_snapshot_call_count_for_tests();
943        let mut desc = Vec::new();
944        store
945            .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
946                desc.push(key.as_bytes()[0]);
947                Ok::<_, Infallible>(desc.len() == 2)
948            })
949            .expect("desc journaled index range traversal should succeed");
950        assert_eq!(desc, vec![5, 4]);
951        assert_eq!(
952            journaled_snapshot_call_count_for_tests(),
953            0,
954            "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
955        );
956    }
957}