Skip to main content

icydb_core/db/index/
store.rs

1//! Module: index::store
2//! Responsibility: journaled-or-heap index-entry storage behind the index-store boundary.
3//! Does not own: range-scan resolution, continuation semantics, or predicate execution.
4//! Boundary: scan/executor layers depend on this storage boundary.
5
6use crate::db::{
7    direction::Direction,
8    index::{
9        IndexEntryValue, IndexId, IndexKeyKind, cardinality::IndexPrefixCardinality,
10        key::RawIndexStoreKey,
11    },
12    ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
13};
14
15use candid::CandidType;
16use ic_memory::stable_structures::{
17    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
18};
19use serde::Deserialize;
20#[cfg(any(test, feature = "diagnostics"))]
21use std::cell::Cell;
22use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
23use std::ops::Bound;
24
25#[cfg(test)]
26thread_local! {
27    static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
28}
29
30#[cfg(feature = "diagnostics")]
31thread_local! {
32    static INDEX_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
33    static INDEX_STORE_RANGE_SCAN_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
34    static INDEX_STORE_ENTRY_READ_COUNT: Cell<u64> = const { Cell::new(0) };
35    static INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT: Cell<u64> = const { Cell::new(0) };
36}
37
38#[cfg(feature = "diagnostics")]
39fn record_index_store_get_call() {
40    INDEX_STORE_GET_CALL_COUNT.with(|count| {
41        count.set(count.get().saturating_add(1));
42    });
43}
44
45#[cfg(feature = "diagnostics")]
46fn record_index_store_range_scan_call() {
47    INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(|count| {
48        count.set(count.get().saturating_add(1));
49    });
50}
51
52#[cfg(feature = "diagnostics")]
53fn record_index_store_entry_read() {
54    INDEX_STORE_ENTRY_READ_COUNT.with(|count| {
55        count.set(count.get().saturating_add(1));
56    });
57}
58
59#[cfg(feature = "diagnostics")]
60fn record_index_store_prefix_cardinality_lookup() {
61    INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT.with(|count| {
62        count.set(count.get().saturating_add(1));
63    });
64}
65
66fn visit_index_store_entry<E>(
67    key: &RawIndexStoreKey,
68    value: &IndexEntryValue,
69    visit: &mut impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
70) -> Result<bool, E> {
71    #[cfg(feature = "diagnostics")]
72    record_index_store_entry_read();
73
74    visit(key, value)
75}
76
77#[cfg(test)]
78fn record_journaled_snapshot_call() {
79    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
80        count.set(count.get().saturating_add(1));
81    });
82}
83
84#[cfg(test)]
85fn reset_journaled_snapshot_call_count_for_tests() {
86    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
87}
88
89#[cfg(test)]
90fn journaled_snapshot_call_count_for_tests() -> u64 {
91    JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
92}
93
94//
95// IndexState
96//
97// Explicit lifecycle visibility state for one index store.
98// Visibility matters because planner-visible indexes must already be complete:
99// the index contents are fully built and query-visible for reads.
100//
101#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
102pub enum IndexState {
103    Building,
104    #[default]
105    Ready,
106    Dropping,
107}
108
109impl IndexState {
110    /// Return the stable lowercase text label for this lifecycle state.
111    #[must_use]
112    pub const fn as_str(self) -> &'static str {
113        match self {
114            Self::Building => "building",
115            Self::Ready => "ready",
116            Self::Dropping => "dropping",
117        }
118    }
119}
120
121///
122/// IndexStore
123///
124/// Thin persistence wrapper over one journaled or heap BTreeMap.
125///
126/// Invariant: callers provide already-validated `RawIndexStoreKey`/`IndexEntryValue`.
127///
128
129pub struct IndexStore {
130    pub(super) backend: IndexStoreBackend,
131    generation: u64,
132    state: IndexState,
133    prefix_cardinality: IndexPrefixCardinality,
134}
135
136pub(super) enum IndexStoreBackend {
137    Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
138    Journaled {
139        canonical:
140            StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
141        live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
142        tombstones: BTreeSet<RawIndexStoreKey>,
143    },
144}
145
146/// Control-flow result for index-store traversal visitors.
147#[derive(Clone, Copy, Debug, Eq, PartialEq)]
148pub(in crate::db) enum IndexStoreVisit {
149    Continue,
150    Stop,
151}
152
153impl IndexStoreVisit {
154    const fn should_stop(self) -> bool {
155        matches!(self, Self::Stop)
156    }
157}
158
159impl IndexStore {
160    /// Initialize a volatile heap-backed index store.
161    #[must_use]
162    pub const fn init_heap() -> Self {
163        Self {
164            backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
165            generation: 0,
166            state: IndexState::Ready,
167            prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
168        }
169    }
170
171    /// Initialize a journaled cached-stable index store.
172    ///
173    /// Normal writes update only the live materialized projection. The
174    /// canonical stable index is updated by future fold/rebuild paths.
175    #[must_use]
176    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
177        let mut store = Self {
178            backend: IndexStoreBackend::Journaled {
179                canonical: StableBTreeMap::init(memory),
180                live: HeapBTreeMap::new(),
181                tombstones: BTreeSet::new(),
182            },
183            generation: 0,
184            state: IndexState::Ready,
185            prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
186        };
187        store.rebuild_prefix_cardinality_from_entries(Some(0));
188        store
189    }
190
191    /// Visit all index entries in canonical store order without exposing the
192    /// backing stable-map iterator.
193    pub(in crate::db) fn visit_entries<E>(
194        &self,
195        mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
196    ) -> Result<(), E> {
197        match &self.backend {
198            IndexStoreBackend::Heap(map) => {
199                for (key, value) in map {
200                    #[cfg(feature = "diagnostics")]
201                    record_index_store_entry_read();
202
203                    if visitor(key, value)?.should_stop() {
204                        return Ok(());
205                    }
206                }
207            }
208            IndexStoreBackend::Journaled {
209                canonical: _,
210                live: _,
211                tombstones: _,
212            } => self.visit_journaled_entries_in_range(
213                (&Bound::Unbounded, &Bound::Unbounded),
214                Direction::Asc,
215                |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
216            )?,
217        }
218
219        Ok(())
220    }
221
222    pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
223        #[cfg(feature = "diagnostics")]
224        record_index_store_get_call();
225
226        match &self.backend {
227            IndexStoreBackend::Heap(map) => map.get(key).cloned(),
228            IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
229        }
230    }
231
232    pub fn len(&self) -> u64 {
233        match &self.backend {
234            IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
235            IndexStoreBackend::Journaled { .. } => {
236                let mut count = 0_u64;
237                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
238                    count = count.saturating_add(1);
239                    Ok(IndexStoreVisit::Continue)
240                });
241                count
242            }
243        }
244    }
245
246    pub fn is_empty(&self) -> bool {
247        match &self.backend {
248            IndexStoreBackend::Heap(map) => map.is_empty(),
249            IndexStoreBackend::Journaled { .. } => {
250                let mut empty = true;
251                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
252                    empty = false;
253                    Ok(IndexStoreVisit::Stop)
254                });
255                empty
256            }
257        }
258    }
259
260    #[must_use]
261    pub(in crate::db) const fn generation(&self) -> u64 {
262        self.generation
263    }
264
265    /// Return the explicit lifecycle state for this index store.
266    #[must_use]
267    pub(in crate::db) const fn state(&self) -> IndexState {
268        self.state
269    }
270
271    /// Return an exact user-index prefix count when the index metadata is
272    /// synchronized with the caller's authoritative row-store generation.
273    #[must_use]
274    pub(in crate::db) fn exact_prefix_cardinality(
275        &self,
276        data_generation: u64,
277        key_kind: IndexKeyKind,
278        index_id: IndexId,
279        components: &[Vec<u8>],
280    ) -> Option<u64> {
281        #[cfg(feature = "diagnostics")]
282        record_index_store_prefix_cardinality_lookup();
283
284        self.prefix_cardinality
285            .exact_count(data_generation, key_kind, index_id, components)
286    }
287
288    /// Return the sum of exact prefix counts for prefixes on the same index
289    /// when synchronized metadata can prove all requested counts.
290    #[must_use]
291    pub(in crate::db) fn exact_prefix_cardinality_sum<'a>(
292        &self,
293        data_generation: u64,
294        key_kind: IndexKeyKind,
295        index_id: IndexId,
296        component_prefixes: impl IntoIterator<Item = &'a [Vec<u8>]>,
297        stop_after: Option<u64>,
298    ) -> Option<u64> {
299        #[cfg(feature = "diagnostics")]
300        record_index_store_prefix_cardinality_lookup();
301
302        self.prefix_cardinality.exact_count_sum(
303            data_generation,
304            key_kind,
305            index_id,
306            component_prefixes,
307            stop_after,
308        )
309    }
310
311    /// Mark prefix-cardinality metadata synchronized with the authoritative
312    /// row-store generation after a committed row/index transition.
313    pub(in crate::db) const fn mark_prefix_cardinality_data_generation(&mut self, generation: u64) {
314        self.prefix_cardinality.mark_synchronized(generation);
315    }
316
317    /// Mark this index store as in-progress and therefore ineligible for
318    /// planner visibility until a full authoritative rebuild ends.
319    pub(in crate::db) const fn mark_building(&mut self) {
320        self.state = IndexState::Building;
321    }
322
323    /// Mark this index store as fully built and planner-visible again.
324    pub(in crate::db) const fn mark_ready(&mut self) {
325        self.state = IndexState::Ready;
326    }
327
328    /// Mark this index store as dropping and therefore not planner-visible.
329    pub(in crate::db) const fn mark_dropping(&mut self) {
330        self.state = IndexState::Dropping;
331    }
332
333    pub(crate) fn insert(
334        &mut self,
335        key: RawIndexStoreKey,
336        entry: IndexEntryValue,
337    ) -> Option<IndexEntryValue> {
338        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
339            self.get(&key)
340        } else {
341            None
342        };
343        let cardinality_key = key.clone();
344        let previous = match &mut self.backend {
345            IndexStoreBackend::Heap(map) => map.insert(key, entry.clone()),
346            IndexStoreBackend::Journaled {
347                live, tombstones, ..
348            } => {
349                tombstones.remove(&key);
350                live.insert(key, entry.clone());
351                previous_journaled
352            }
353        };
354        self.prefix_cardinality
355            .apply_insert(&cardinality_key, previous.as_ref(), &entry);
356        self.bump_generation();
357        previous
358    }
359
360    pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
361        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
362            self.get(key)
363        } else {
364            None
365        };
366        let previous = match &mut self.backend {
367            IndexStoreBackend::Heap(map) => map.remove(key),
368            IndexStoreBackend::Journaled {
369                live, tombstones, ..
370            } => {
371                live.remove(key);
372                tombstones.insert(key.clone());
373                previous_journaled
374            }
375        };
376        self.prefix_cardinality.apply_remove(key, previous.as_ref());
377        self.bump_generation();
378        previous
379    }
380
381    pub fn clear(&mut self) {
382        match &mut self.backend {
383            IndexStoreBackend::Heap(map) => map.clear(),
384            IndexStoreBackend::Journaled {
385                canonical,
386                live,
387                tombstones,
388            } => {
389                live.clear();
390                tombstones.clear();
391                for entry in canonical.iter() {
392                    tombstones.insert(entry.key().clone());
393                }
394            }
395        }
396        self.prefix_cardinality.clear_unsynchronized();
397        self.bump_generation();
398    }
399
400    /// Fold the current journaled materialized index view into the canonical
401    /// stable base and clear volatile projection state.
402    pub(in crate::db) fn fold_journaled_materialized_view(
403        &mut self,
404    ) -> Result<(), crate::error::InternalError> {
405        let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
406        let IndexStoreBackend::Journaled {
407            canonical,
408            live,
409            tombstones,
410        } = &mut self.backend
411        else {
412            return Err(crate::error::InternalError::store_invariant());
413        };
414
415        canonical.clear_new();
416        for (key, value) in entries {
417            canonical.insert(key, value);
418        }
419        live.clear();
420        tombstones.clear();
421        let data_generation = self.prefix_cardinality.synchronized_generation();
422        self.rebuild_prefix_cardinality_from_entries(data_generation);
423        self.bump_generation();
424
425        Ok(())
426    }
427
428    /// Sum of bytes used by all stored index entries.
429    pub fn memory_bytes(&self) -> u64 {
430        let mut bytes = 0u64;
431        let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
432            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
433            Ok(IndexStoreVisit::Continue)
434        });
435        bytes
436    }
437
438    /// Return the monotonic perf-only count of index-entry fetches seen by this process.
439    #[cfg(feature = "diagnostics")]
440    pub(in crate::db) fn current_get_call_count() -> u64 {
441        INDEX_STORE_GET_CALL_COUNT.with(Cell::get)
442    }
443
444    /// Return the monotonic perf-only count of index range traversal probes seen by this process.
445    #[cfg(feature = "diagnostics")]
446    pub(in crate::db) fn current_range_scan_call_count() -> u64 {
447        INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(Cell::get)
448    }
449
450    /// Return the monotonic perf-only count of index entries yielded by traversal.
451    #[cfg(feature = "diagnostics")]
452    pub(in crate::db) fn current_entry_read_count() -> u64 {
453        INDEX_STORE_ENTRY_READ_COUNT.with(Cell::get)
454    }
455
456    /// Return the monotonic perf-only count of exact prefix-cardinality probes.
457    #[cfg(all(test, feature = "diagnostics"))]
458    pub(in crate::db) fn current_prefix_cardinality_lookup_count() -> u64 {
459        INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT.with(Cell::get)
460    }
461
462    #[cfg(feature = "diagnostics")]
463    pub(in crate::db::index) fn record_range_scan_call() {
464        record_index_store_range_scan_call();
465    }
466
467    const fn bump_generation(&mut self) {
468        self.generation = self.generation.saturating_add(1);
469    }
470
471    fn rebuild_prefix_cardinality_from_entries(&mut self, data_generation: Option<u64>) {
472        self.prefix_cardinality.clear_unsynchronized();
473        let entries = Self::entries_snapshot_for_cardinality(&self.backend);
474        for (key, value) in &entries {
475            self.prefix_cardinality.apply_insert(key, None, value);
476        }
477        if let Some(data_generation) = data_generation {
478            self.prefix_cardinality.mark_synchronized(data_generation);
479        }
480    }
481
482    fn entries_snapshot_for_cardinality(
483        backend: &IndexStoreBackend,
484    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
485        match backend {
486            IndexStoreBackend::Heap(map) => map.clone(),
487            IndexStoreBackend::Journaled { .. } => {
488                Self::journaled_entries_snapshot_for_fold(backend)
489            }
490        }
491    }
492
493    #[cfg(test)]
494    #[must_use]
495    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
496        match &self.backend {
497            IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
498            IndexStoreBackend::Heap(_) => 0,
499        }
500    }
501
502    fn journaled_get(
503        backend: &IndexStoreBackend,
504        key: &RawIndexStoreKey,
505    ) -> Option<IndexEntryValue> {
506        let IndexStoreBackend::Journaled {
507            canonical,
508            live,
509            tombstones,
510        } = backend
511        else {
512            return None;
513        };
514
515        if tombstones.contains(key) {
516            return None;
517        }
518        live.get(key).cloned().or_else(|| canonical.get(key))
519    }
520
521    pub(super) fn journaled_entries_snapshot_for_fold(
522        backend: &IndexStoreBackend,
523    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
524        #[cfg(test)]
525        record_journaled_snapshot_call();
526
527        let IndexStoreBackend::Journaled {
528            canonical,
529            live,
530            tombstones,
531        } = backend
532        else {
533            return HeapBTreeMap::new();
534        };
535
536        let mut entries = HeapBTreeMap::new();
537        for entry in canonical.iter() {
538            let key = entry.key().clone();
539            if !tombstones.contains(&key) {
540                entries.insert(key, entry.value());
541            }
542        }
543        for (key, value) in live {
544            if !tombstones.contains(key) {
545                entries.insert(key.clone(), value.clone());
546            }
547        }
548
549        entries
550    }
551
552    pub(super) fn visit_journaled_entries_in_range<E>(
553        &self,
554        bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
555        direction: Direction,
556        mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
557    ) -> Result<(), E> {
558        let IndexStoreBackend::Journaled {
559            canonical,
560            live,
561            tombstones,
562        } = &self.backend
563        else {
564            return Ok(());
565        };
566
567        let lower = bounds.0.clone();
568        let upper = bounds.1.clone();
569        match direction {
570            Direction::Asc if canonical.is_empty() => {
571                for (key, value) in live.range((lower, upper)) {
572                    if visit_index_store_entry(key, value, &mut visit)? {
573                        return Ok(());
574                    }
575                }
576            }
577            Direction::Desc if canonical.is_empty() => {
578                for (key, value) in live.range((lower, upper)).rev() {
579                    if visit_index_store_entry(key, value, &mut visit)? {
580                        return Ok(());
581                    }
582                }
583            }
584            Direction::Asc if live.is_empty() && tombstones.is_empty() => {
585                for entry in canonical.range((lower, upper)) {
586                    if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
587                        return Ok(());
588                    }
589                }
590            }
591            Direction::Desc if live.is_empty() && tombstones.is_empty() => {
592                for entry in canonical.range((lower, upper)).rev() {
593                    if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
594                        return Ok(());
595                    }
596                }
597            }
598            Direction::Asc => {
599                visit_ordered_overlay(
600                    canonical.range((lower.clone(), upper.clone())),
601                    live.range((lower, upper)),
602                    direction,
603                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
604                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
605                    |live_entry| !tombstones.contains(live_entry.0),
606                    |entry| {
607                        let should_stop = match entry {
608                            OrderedOverlayEntry::Canonical(canonical_entry) => {
609                                visit_index_store_entry(
610                                    canonical_entry.key(),
611                                    &canonical_entry.value(),
612                                    &mut visit,
613                                )?
614                            }
615                            OrderedOverlayEntry::Live((key, value)) => {
616                                visit_index_store_entry(key, value, &mut visit)?
617                            }
618                        };
619                        Ok(if should_stop {
620                            OrderedOverlayVisit::Stop
621                        } else {
622                            OrderedOverlayVisit::Continue
623                        })
624                    },
625                )?;
626            }
627            Direction::Desc => {
628                visit_ordered_overlay(
629                    canonical.range((lower.clone(), upper.clone())).rev(),
630                    live.range((lower, upper)).rev(),
631                    direction,
632                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
633                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
634                    |live_entry| !tombstones.contains(live_entry.0),
635                    |entry| {
636                        let should_stop = match entry {
637                            OrderedOverlayEntry::Canonical(canonical_entry) => {
638                                visit_index_store_entry(
639                                    canonical_entry.key(),
640                                    &canonical_entry.value(),
641                                    &mut visit,
642                                )?
643                            }
644                            OrderedOverlayEntry::Live((key, value)) => {
645                                visit_index_store_entry(key, value, &mut visit)?
646                            }
647                        };
648                        Ok(if should_stop {
649                            OrderedOverlayVisit::Stop
650                        } else {
651                            OrderedOverlayVisit::Continue
652                        })
653                    },
654                )?;
655            }
656        }
657
658        Ok(())
659    }
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665    use crate::{
666        db::{
667            direction::Direction,
668            index::{IndexId, IndexKey, IndexKeyKind},
669            key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
670        },
671        testing::test_memory,
672        traits::Storable,
673        types::EntityTag,
674    };
675    use std::{borrow::Cow, convert::Infallible};
676
677    fn raw_key(value: u8) -> RawIndexStoreKey {
678        <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
679    }
680
681    fn indexed_raw_key(
682        index_id: &IndexId,
683        components: Vec<Vec<u8>>,
684        primary_key: u64,
685    ) -> RawIndexStoreKey {
686        indexed_raw_key_with_kind(index_id, IndexKeyKind::User, components, primary_key)
687    }
688
689    fn indexed_raw_key_with_kind(
690        index_id: &IndexId,
691        key_kind: IndexKeyKind,
692        components: Vec<Vec<u8>>,
693        primary_key: u64,
694    ) -> RawIndexStoreKey {
695        IndexKey::new_from_components_with_primary_key_value(
696            index_id,
697            key_kind,
698            components.as_slice(),
699            &PrimaryKeyValue::from(PrimaryKeyComponent::Nat64(primary_key)),
700        )
701        .to_raw()
702    }
703
704    fn malformed_index_entry_value() -> IndexEntryValue {
705        <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![0xFF]))
706    }
707
708    fn missing_index_entry_value() -> IndexEntryValue {
709        <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![1]))
710    }
711
712    #[test]
713    fn index_prefix_cardinality_requires_explicit_data_generation_sync() {
714        let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
715        let collection = b"collection-a".to_vec();
716        let draft = b"Draft".to_vec();
717        let review = b"Review".to_vec();
718        let mut store = IndexStore::init_heap();
719
720        store.insert(
721            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
722            IndexEntryValue::presence(),
723        );
724        store.insert(
725            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2),
726            IndexEntryValue::presence(),
727        );
728        store.insert(
729            indexed_raw_key(&index_id, vec![collection.clone(), review.clone()], 3),
730            IndexEntryValue::presence(),
731        );
732
733        assert_eq!(
734            store.exact_prefix_cardinality(
735                0,
736                IndexKeyKind::User,
737                index_id,
738                std::slice::from_ref(&collection),
739            ),
740            None,
741            "raw index mutations must not be trusted until row generation sync is stamped",
742        );
743
744        store.mark_prefix_cardinality_data_generation(7);
745
746        assert_eq!(
747            store.exact_prefix_cardinality(
748                7,
749                IndexKeyKind::User,
750                index_id,
751                std::slice::from_ref(&collection),
752            ),
753            Some(3),
754        );
755        assert_eq!(
756            store.exact_prefix_cardinality(
757                7,
758                IndexKeyKind::User,
759                index_id,
760                &[collection.clone(), draft],
761            ),
762            Some(2),
763        );
764        assert_eq!(
765            store.exact_prefix_cardinality(8, IndexKeyKind::User, index_id, &[collection, review],),
766            None,
767            "row generation drift should force the caller to use the existing-row fallback",
768        );
769    }
770
771    #[test]
772    fn index_prefix_cardinality_ignores_system_index_mutations() {
773        let user_index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
774        let system_index_id = IndexId::new(EntityTag::new(0xCA7D), 2);
775        let collection = b"collection-a".to_vec();
776        let draft = b"Draft".to_vec();
777        let system_component = b"reverse-edge".to_vec();
778        let mut store = IndexStore::init_heap();
779
780        store.insert(
781            indexed_raw_key(&user_index_id, vec![collection.clone(), draft.clone()], 1),
782            IndexEntryValue::presence(),
783        );
784        store.mark_prefix_cardinality_data_generation(7);
785
786        assert_eq!(
787            store.exact_prefix_cardinality(
788                7,
789                IndexKeyKind::User,
790                user_index_id,
791                &[collection.clone(), draft.clone()],
792            ),
793            Some(1),
794        );
795
796        let system_key = indexed_raw_key_with_kind(
797            &system_index_id,
798            IndexKeyKind::System,
799            vec![system_component],
800            1,
801        );
802        store.insert(system_key.clone(), IndexEntryValue::presence());
803        assert_eq!(
804            store.exact_prefix_cardinality(
805                7,
806                IndexKeyKind::User,
807                user_index_id,
808                &[collection.clone(), draft.clone()],
809            ),
810            Some(1),
811            "system index writes must not invalidate synchronized user-prefix cardinality",
812        );
813
814        store.remove(&system_key);
815        assert_eq!(
816            store.exact_prefix_cardinality(
817                7,
818                IndexKeyKind::User,
819                user_index_id,
820                &[collection.clone(), draft.clone()],
821            ),
822            Some(1),
823            "system index removals must not invalidate synchronized user-prefix cardinality",
824        );
825
826        let malformed_system_key = indexed_raw_key_with_kind(
827            &system_index_id,
828            IndexKeyKind::System,
829            vec![b"malformed-reverse-edge".to_vec()],
830            2,
831        );
832        store.insert(malformed_system_key.clone(), malformed_index_entry_value());
833        assert_eq!(
834            store.exact_prefix_cardinality(
835                7,
836                IndexKeyKind::User,
837                user_index_id,
838                &[collection.clone(), draft.clone()],
839            ),
840            Some(1),
841            "malformed system index payloads must not invalidate user-prefix cardinality",
842        );
843
844        store.remove(&malformed_system_key);
845        assert_eq!(
846            store.exact_prefix_cardinality(
847                7,
848                IndexKeyKind::User,
849                user_index_id,
850                &[collection.clone(), draft],
851            ),
852            Some(1),
853            "malformed system index removals must not invalidate user-prefix cardinality",
854        );
855
856        let review = b"Review".to_vec();
857        store.insert(
858            indexed_raw_key(&user_index_id, vec![collection.clone(), review.clone()], 2),
859            IndexEntryValue::presence(),
860        );
861        assert_eq!(
862            store.exact_prefix_cardinality(
863                7,
864                IndexKeyKind::User,
865                user_index_id,
866                &[collection, review]
867            ),
868            None,
869            "user-prefix count changes must still require a fresh row-generation stamp",
870        );
871    }
872
873    #[test]
874    fn index_prefix_cardinality_ignores_missing_user_index_mutations() {
875        let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
876        let collection = b"collection-a".to_vec();
877        let draft = b"Draft".to_vec();
878        let mut store = IndexStore::init_heap();
879
880        store.insert(
881            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
882            IndexEntryValue::presence(),
883        );
884        store.mark_prefix_cardinality_data_generation(7);
885
886        let stale_key = indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2);
887        store.insert(stale_key.clone(), missing_index_entry_value());
888        assert_eq!(
889            store.exact_prefix_cardinality(
890                7,
891                IndexKeyKind::User,
892                index_id,
893                &[collection.clone(), draft.clone()],
894            ),
895            Some(1),
896            "missing user index entries must not affect synchronized prefix cardinality",
897        );
898
899        store.remove(&stale_key);
900        assert_eq!(
901            store.exact_prefix_cardinality(7, IndexKeyKind::User, index_id, &[collection, draft],),
902            Some(1),
903            "missing user index removals must not affect synchronized prefix cardinality",
904        );
905    }
906
907    #[cfg(feature = "diagnostics")]
908    #[test]
909    fn index_store_diagnostic_counters_record_gets_range_scans_and_entry_reads() {
910        let mut store = IndexStore::init_heap();
911        store.insert(raw_key(7), IndexEntryValue::presence());
912        store.insert(raw_key(9), IndexEntryValue::presence());
913
914        let gets_before = IndexStore::current_get_call_count();
915        assert_eq!(store.get(&raw_key(7)), Some(IndexEntryValue::presence()));
916        assert_eq!(store.get(&raw_key(8)), None);
917
918        assert_eq!(
919            IndexStore::current_get_call_count().saturating_sub(gets_before),
920            2,
921            "diagnostic index-store get counter should count both hit and miss reads",
922        );
923
924        let range_scans_before = IndexStore::current_range_scan_call_count();
925        let lower = Bound::Included(raw_key(7));
926        let upper = Bound::Included(raw_key(9));
927        store
928            .visit_raw_entries_in_range((&lower, &upper), Direction::Asc, |_key, _entry| Ok(false))
929            .expect("raw index range visit should succeed");
930
931        assert_eq!(
932            IndexStore::current_range_scan_call_count().saturating_sub(range_scans_before),
933            1,
934            "diagnostic index-store range-scan counter should count one range traversal probe",
935        );
936
937        let entries_before = IndexStore::current_entry_read_count();
938        store
939            .visit_entries(|_key, _entry| Ok::<_, Infallible>(IndexStoreVisit::Continue))
940            .expect("index entry visit should succeed");
941
942        assert_eq!(
943            IndexStore::current_entry_read_count().saturating_sub(entries_before),
944            2,
945            "diagnostic index-store entry counter should count yielded traversal entries",
946        );
947    }
948
949    #[test]
950    fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
951        let mut store = IndexStore::init_journaled(test_memory(93));
952        for value in [1_u8, 3, 5] {
953            store.insert(raw_key(value), IndexEntryValue::presence());
954        }
955        store
956            .fold_journaled_materialized_view()
957            .expect("canonical index seed should fold");
958
959        store.insert(raw_key(0), IndexEntryValue::presence());
960        store.insert(raw_key(4), IndexEntryValue::presence());
961        store.insert(raw_key(5), IndexEntryValue::presence());
962        store.remove(&raw_key(1));
963
964        let lower = Bound::Included(raw_key(0));
965        let upper = Bound::Included(raw_key(5));
966
967        reset_journaled_snapshot_call_count_for_tests();
968        let mut asc = Vec::new();
969        store
970            .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
971                asc.push(key.as_bytes()[0]);
972                Ok::<_, Infallible>(asc.len() == 2)
973            })
974            .expect("asc journaled index range traversal should succeed");
975        assert_eq!(asc, vec![0, 3]);
976        assert_eq!(
977            journaled_snapshot_call_count_for_tests(),
978            0,
979            "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
980        );
981
982        reset_journaled_snapshot_call_count_for_tests();
983        let mut desc = Vec::new();
984        store
985            .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
986                desc.push(key.as_bytes()[0]);
987                Ok::<_, Infallible>(desc.len() == 2)
988            })
989            .expect("desc journaled index range traversal should succeed");
990        assert_eq!(desc, vec![5, 4]);
991        assert_eq!(
992            journaled_snapshot_call_count_for_tests(),
993            0,
994            "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
995        );
996    }
997}