Skip to main content

icydb_core/db/index/
store.rs

1//! Module: index::store
2//! Responsibility: journaled-or-heap index-entry storage behind the index-store boundary.
3//! Does not own: range-scan resolution, continuation semantics, or predicate execution.
4//! Boundary: scan/executor layers depend on this storage boundary.
5
6use crate::db::{
7    direction::Direction,
8    index::{
9        IndexEntryValue, IndexId, IndexKeyKind, cardinality::IndexPrefixCardinality,
10        key::RawIndexStoreKey,
11    },
12    ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
13};
14
15use candid::CandidType;
16use ic_memory::stable_structures::{
17    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
18};
19use serde::Deserialize;
20#[cfg(any(test, feature = "diagnostics"))]
21use std::cell::Cell;
22use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
23use std::ops::Bound;
24
25#[cfg(test)]
26thread_local! {
27    static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
28}
29
30#[cfg(feature = "diagnostics")]
31thread_local! {
32    static INDEX_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
33}
34
35#[cfg(any(test, feature = "diagnostics"))]
36thread_local! {
37    static INDEX_STORE_RANGE_SCAN_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
38    static INDEX_STORE_ENTRY_READ_COUNT: Cell<u64> = const { Cell::new(0) };
39    static INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT: Cell<u64> = const { Cell::new(0) };
40}
41
42#[cfg(feature = "diagnostics")]
43fn record_index_store_get_call() {
44    INDEX_STORE_GET_CALL_COUNT.with(|count| {
45        count.set(count.get().saturating_add(1));
46    });
47}
48
49#[cfg(any(test, feature = "diagnostics"))]
50fn record_index_store_range_scan_call() {
51    INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(|count| {
52        count.set(count.get().saturating_add(1));
53    });
54}
55
56#[cfg(any(test, feature = "diagnostics"))]
57fn record_index_store_entry_read() {
58    INDEX_STORE_ENTRY_READ_COUNT.with(|count| {
59        count.set(count.get().saturating_add(1));
60    });
61}
62
63#[cfg(any(test, feature = "diagnostics"))]
64fn record_index_store_prefix_cardinality_lookup() {
65    INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT.with(|count| {
66        count.set(count.get().saturating_add(1));
67    });
68}
69
70fn visit_index_store_entry<E>(
71    key: &RawIndexStoreKey,
72    value: &IndexEntryValue,
73    visit: &mut impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
74) -> Result<bool, E> {
75    #[cfg(any(test, feature = "diagnostics"))]
76    record_index_store_entry_read();
77
78    visit(key, value)
79}
80
81#[cfg(test)]
82fn record_journaled_snapshot_call() {
83    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
84        count.set(count.get().saturating_add(1));
85    });
86}
87
88#[cfg(test)]
89fn reset_journaled_snapshot_call_count_for_tests() {
90    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
91}
92
93#[cfg(test)]
94fn journaled_snapshot_call_count_for_tests() -> u64 {
95    JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
96}
97
98//
99// IndexState
100//
101// Explicit lifecycle visibility state for one index store.
102// Visibility matters because planner-visible indexes must already be complete:
103// the index contents are fully built and query-visible for reads.
104//
105#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
106pub enum IndexState {
107    Building,
108    #[default]
109    Ready,
110    Dropping,
111}
112
113impl IndexState {
114    /// Return the stable lowercase text label for this lifecycle state.
115    #[must_use]
116    pub const fn as_str(self) -> &'static str {
117        match self {
118            Self::Building => "building",
119            Self::Ready => "ready",
120            Self::Dropping => "dropping",
121        }
122    }
123}
124
125///
126/// IndexStore
127///
128/// Thin persistence wrapper over one journaled or heap BTreeMap.
129///
130/// Invariant: callers provide already-validated `RawIndexStoreKey`/`IndexEntryValue`.
131///
132
133pub struct IndexStore {
134    pub(super) backend: IndexStoreBackend,
135    generation: u64,
136    state: IndexState,
137    prefix_cardinality: IndexPrefixCardinality,
138}
139
140pub(super) enum IndexStoreBackend {
141    Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
142    Journaled {
143        canonical:
144            StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
145        live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
146        tombstones: BTreeSet<RawIndexStoreKey>,
147    },
148}
149
150/// Control-flow result for index-store traversal visitors.
151#[derive(Clone, Copy, Debug, Eq, PartialEq)]
152pub(in crate::db) enum IndexStoreVisit {
153    Continue,
154    Stop,
155}
156
157impl IndexStoreVisit {
158    const fn should_stop(self) -> bool {
159        matches!(self, Self::Stop)
160    }
161}
162
163impl IndexStore {
164    /// Initialize a volatile heap-backed index store.
165    #[must_use]
166    pub const fn init_heap() -> Self {
167        Self {
168            backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
169            generation: 0,
170            state: IndexState::Ready,
171            prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
172        }
173    }
174
175    /// Initialize a journaled cached-stable index store.
176    ///
177    /// Normal writes update only the live materialized projection. The
178    /// canonical stable index is updated by future fold/rebuild paths.
179    #[must_use]
180    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
181        let mut store = Self {
182            backend: IndexStoreBackend::Journaled {
183                canonical: StableBTreeMap::init(memory),
184                live: HeapBTreeMap::new(),
185                tombstones: BTreeSet::new(),
186            },
187            generation: 0,
188            state: IndexState::Ready,
189            prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
190        };
191        store.rebuild_prefix_cardinality_from_entries(Some(0));
192        store
193    }
194
195    /// Visit all index entries in canonical store order without exposing the
196    /// backing stable-map iterator.
197    pub(in crate::db) fn visit_entries<E>(
198        &self,
199        mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
200    ) -> Result<(), E> {
201        match &self.backend {
202            IndexStoreBackend::Heap(map) => {
203                for (key, value) in map {
204                    #[cfg(any(test, feature = "diagnostics"))]
205                    record_index_store_entry_read();
206
207                    if visitor(key, value)?.should_stop() {
208                        return Ok(());
209                    }
210                }
211            }
212            IndexStoreBackend::Journaled {
213                canonical: _,
214                live: _,
215                tombstones: _,
216            } => self.visit_journaled_entries_in_range(
217                (&Bound::Unbounded, &Bound::Unbounded),
218                Direction::Asc,
219                |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
220            )?,
221        }
222
223        Ok(())
224    }
225
226    pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
227        #[cfg(feature = "diagnostics")]
228        record_index_store_get_call();
229
230        match &self.backend {
231            IndexStoreBackend::Heap(map) => map.get(key).cloned(),
232            IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
233        }
234    }
235
236    pub fn len(&self) -> u64 {
237        match &self.backend {
238            IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
239            IndexStoreBackend::Journaled { .. } => {
240                let mut count = 0_u64;
241                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
242                    count = count.saturating_add(1);
243                    Ok(IndexStoreVisit::Continue)
244                });
245                count
246            }
247        }
248    }
249
250    pub fn is_empty(&self) -> bool {
251        match &self.backend {
252            IndexStoreBackend::Heap(map) => map.is_empty(),
253            IndexStoreBackend::Journaled { .. } => {
254                let mut empty = true;
255                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
256                    empty = false;
257                    Ok(IndexStoreVisit::Stop)
258                });
259                empty
260            }
261        }
262    }
263
264    #[must_use]
265    pub(in crate::db) const fn generation(&self) -> u64 {
266        self.generation
267    }
268
269    /// Return the explicit lifecycle state for this index store.
270    #[must_use]
271    pub(in crate::db) const fn state(&self) -> IndexState {
272        self.state
273    }
274
275    /// Return an exact user-index prefix count when the index metadata is
276    /// synchronized with the caller's authoritative row-store generation.
277    #[must_use]
278    pub(in crate::db) fn exact_prefix_cardinality(
279        &self,
280        data_generation: u64,
281        key_kind: IndexKeyKind,
282        index_id: IndexId,
283        components: &[Vec<u8>],
284    ) -> Option<u64> {
285        #[cfg(any(test, feature = "diagnostics"))]
286        record_index_store_prefix_cardinality_lookup();
287
288        self.prefix_cardinality
289            .exact_count(data_generation, key_kind, index_id, components)
290    }
291
292    /// Return the sum of exact prefix counts for prefixes on the same index
293    /// when synchronized metadata can prove all requested counts.
294    #[must_use]
295    pub(in crate::db) fn exact_prefix_cardinality_sum<'a>(
296        &self,
297        data_generation: u64,
298        key_kind: IndexKeyKind,
299        index_id: IndexId,
300        component_prefixes: impl IntoIterator<Item = &'a [Vec<u8>]>,
301        stop_after: Option<u64>,
302    ) -> Option<u64> {
303        #[cfg(any(test, feature = "diagnostics"))]
304        record_index_store_prefix_cardinality_lookup();
305
306        self.prefix_cardinality.exact_count_sum(
307            data_generation,
308            key_kind,
309            index_id,
310            component_prefixes,
311            stop_after,
312        )
313    }
314
315    /// Return non-empty exact child prefixes under one already-encoded parent
316    /// prefix when synchronized metadata can prove the bounded child set.
317    #[must_use]
318    pub(in crate::db) fn exact_child_prefixes(
319        &self,
320        data_generation: u64,
321        key_kind: IndexKeyKind,
322        index_id: IndexId,
323        parent_components: &[Vec<u8>],
324        max_children: usize,
325    ) -> Option<Vec<Vec<Vec<u8>>>> {
326        #[cfg(any(test, feature = "diagnostics"))]
327        record_index_store_prefix_cardinality_lookup();
328
329        self.prefix_cardinality.exact_child_prefixes(
330            data_generation,
331            key_kind,
332            index_id,
333            parent_components,
334            max_children,
335        )
336    }
337
338    /// Mark prefix-cardinality metadata synchronized with the authoritative
339    /// row-store generation after a committed row/index transition.
340    pub(in crate::db) const fn mark_prefix_cardinality_data_generation(&mut self, generation: u64) {
341        self.prefix_cardinality.mark_synchronized(generation);
342    }
343
344    /// Mark this index store as in-progress and therefore ineligible for
345    /// planner visibility until a full authoritative rebuild ends.
346    pub(in crate::db) const fn mark_building(&mut self) {
347        self.state = IndexState::Building;
348    }
349
350    /// Mark this index store as fully built and planner-visible again.
351    pub(in crate::db) const fn mark_ready(&mut self) {
352        self.state = IndexState::Ready;
353    }
354
355    /// Mark this index store as dropping and therefore not planner-visible.
356    pub(in crate::db) const fn mark_dropping(&mut self) {
357        self.state = IndexState::Dropping;
358    }
359
360    pub(crate) fn insert(
361        &mut self,
362        key: RawIndexStoreKey,
363        entry: IndexEntryValue,
364    ) -> Option<IndexEntryValue> {
365        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
366            self.get(&key)
367        } else {
368            None
369        };
370        let cardinality_key = key.clone();
371        let previous = match &mut self.backend {
372            IndexStoreBackend::Heap(map) => map.insert(key, entry.clone()),
373            IndexStoreBackend::Journaled {
374                live, tombstones, ..
375            } => {
376                tombstones.remove(&key);
377                live.insert(key, entry.clone());
378                previous_journaled
379            }
380        };
381        self.prefix_cardinality
382            .apply_insert(&cardinality_key, previous.as_ref(), &entry);
383        self.bump_generation();
384        previous
385    }
386
387    pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
388        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
389            self.get(key)
390        } else {
391            None
392        };
393        let previous = match &mut self.backend {
394            IndexStoreBackend::Heap(map) => map.remove(key),
395            IndexStoreBackend::Journaled {
396                live, tombstones, ..
397            } => {
398                live.remove(key);
399                tombstones.insert(key.clone());
400                previous_journaled
401            }
402        };
403        self.prefix_cardinality.apply_remove(key, previous.as_ref());
404        self.bump_generation();
405        previous
406    }
407
408    pub fn clear(&mut self) {
409        match &mut self.backend {
410            IndexStoreBackend::Heap(map) => map.clear(),
411            IndexStoreBackend::Journaled {
412                canonical,
413                live,
414                tombstones,
415            } => {
416                live.clear();
417                tombstones.clear();
418                for entry in canonical.iter() {
419                    tombstones.insert(entry.key().clone());
420                }
421            }
422        }
423        self.prefix_cardinality.clear_unsynchronized();
424        self.bump_generation();
425    }
426
427    /// Fold the current journaled materialized index view into the canonical
428    /// stable base and clear volatile projection state.
429    pub(in crate::db) fn fold_journaled_materialized_view(
430        &mut self,
431    ) -> Result<(), crate::error::InternalError> {
432        let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
433        let IndexStoreBackend::Journaled {
434            canonical,
435            live,
436            tombstones,
437        } = &mut self.backend
438        else {
439            return Err(crate::error::InternalError::store_invariant());
440        };
441
442        canonical.clear_new();
443        for (key, value) in entries {
444            canonical.insert(key, value);
445        }
446        live.clear();
447        tombstones.clear();
448        let data_generation = self.prefix_cardinality.synchronized_generation();
449        self.rebuild_prefix_cardinality_from_entries(data_generation);
450        self.bump_generation();
451
452        Ok(())
453    }
454
455    /// Sum of bytes used by all stored index entries.
456    pub fn memory_bytes(&self) -> u64 {
457        let mut bytes = 0u64;
458        let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
459            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
460            Ok(IndexStoreVisit::Continue)
461        });
462        bytes
463    }
464
465    /// Return the monotonic perf-only count of index-entry fetches seen by this process.
466    #[cfg(feature = "diagnostics")]
467    pub(in crate::db) fn current_get_call_count() -> u64 {
468        INDEX_STORE_GET_CALL_COUNT.with(Cell::get)
469    }
470
471    /// Return the monotonic perf-only count of index range traversal probes seen by this process.
472    #[cfg(any(test, feature = "diagnostics"))]
473    pub(in crate::db) fn current_range_scan_call_count() -> u64 {
474        INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(Cell::get)
475    }
476
477    /// Return the monotonic perf-only count of index entries yielded by traversal.
478    #[cfg(any(test, feature = "diagnostics"))]
479    pub(in crate::db) fn current_entry_read_count() -> u64 {
480        INDEX_STORE_ENTRY_READ_COUNT.with(Cell::get)
481    }
482
483    /// Return the monotonic perf-only count of exact prefix-cardinality probes.
484    #[cfg(test)]
485    pub(in crate::db) fn current_prefix_cardinality_lookup_count() -> u64 {
486        INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT.with(Cell::get)
487    }
488
489    #[cfg(any(test, feature = "diagnostics"))]
490    pub(in crate::db::index) fn record_range_scan_call() {
491        record_index_store_range_scan_call();
492    }
493
494    const fn bump_generation(&mut self) {
495        self.generation = self.generation.saturating_add(1);
496    }
497
498    fn rebuild_prefix_cardinality_from_entries(&mut self, data_generation: Option<u64>) {
499        self.prefix_cardinality.clear_unsynchronized();
500        let entries = Self::entries_snapshot_for_cardinality(&self.backend);
501        for (key, value) in &entries {
502            self.prefix_cardinality.apply_insert(key, None, value);
503        }
504        if let Some(data_generation) = data_generation {
505            self.prefix_cardinality.mark_synchronized(data_generation);
506        }
507    }
508
509    fn entries_snapshot_for_cardinality(
510        backend: &IndexStoreBackend,
511    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
512        match backend {
513            IndexStoreBackend::Heap(map) => map.clone(),
514            IndexStoreBackend::Journaled { .. } => {
515                Self::journaled_entries_snapshot_for_fold(backend)
516            }
517        }
518    }
519
520    #[cfg(test)]
521    #[must_use]
522    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
523        match &self.backend {
524            IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
525            IndexStoreBackend::Heap(_) => 0,
526        }
527    }
528
529    fn journaled_get(
530        backend: &IndexStoreBackend,
531        key: &RawIndexStoreKey,
532    ) -> Option<IndexEntryValue> {
533        let IndexStoreBackend::Journaled {
534            canonical,
535            live,
536            tombstones,
537        } = backend
538        else {
539            return None;
540        };
541
542        if tombstones.contains(key) {
543            return None;
544        }
545        live.get(key).cloned().or_else(|| canonical.get(key))
546    }
547
548    pub(super) fn journaled_entries_snapshot_for_fold(
549        backend: &IndexStoreBackend,
550    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
551        #[cfg(test)]
552        record_journaled_snapshot_call();
553
554        let IndexStoreBackend::Journaled {
555            canonical,
556            live,
557            tombstones,
558        } = backend
559        else {
560            return HeapBTreeMap::new();
561        };
562
563        let mut entries = HeapBTreeMap::new();
564        for entry in canonical.iter() {
565            let key = entry.key().clone();
566            if !tombstones.contains(&key) {
567                entries.insert(key, entry.value());
568            }
569        }
570        for (key, value) in live {
571            if !tombstones.contains(key) {
572                entries.insert(key.clone(), value.clone());
573            }
574        }
575
576        entries
577    }
578
579    pub(super) fn visit_journaled_entries_in_range<E>(
580        &self,
581        bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
582        direction: Direction,
583        mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
584    ) -> Result<(), E> {
585        let IndexStoreBackend::Journaled {
586            canonical,
587            live,
588            tombstones,
589        } = &self.backend
590        else {
591            return Ok(());
592        };
593
594        let lower = bounds.0.clone();
595        let upper = bounds.1.clone();
596        match direction {
597            Direction::Asc if canonical.is_empty() => {
598                for (key, value) in live.range((lower, upper)) {
599                    if visit_index_store_entry(key, value, &mut visit)? {
600                        return Ok(());
601                    }
602                }
603            }
604            Direction::Desc if canonical.is_empty() => {
605                for (key, value) in live.range((lower, upper)).rev() {
606                    if visit_index_store_entry(key, value, &mut visit)? {
607                        return Ok(());
608                    }
609                }
610            }
611            Direction::Asc if live.is_empty() && tombstones.is_empty() => {
612                for entry in canonical.range((lower, upper)) {
613                    if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
614                        return Ok(());
615                    }
616                }
617            }
618            Direction::Desc if live.is_empty() && tombstones.is_empty() => {
619                for entry in canonical.range((lower, upper)).rev() {
620                    if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
621                        return Ok(());
622                    }
623                }
624            }
625            Direction::Asc => {
626                visit_ordered_overlay(
627                    canonical.range((lower.clone(), upper.clone())),
628                    live.range((lower, upper)),
629                    direction,
630                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
631                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
632                    |live_entry| !tombstones.contains(live_entry.0),
633                    |entry| {
634                        let should_stop = match entry {
635                            OrderedOverlayEntry::Canonical(canonical_entry) => {
636                                visit_index_store_entry(
637                                    canonical_entry.key(),
638                                    &canonical_entry.value(),
639                                    &mut visit,
640                                )?
641                            }
642                            OrderedOverlayEntry::Live((key, value)) => {
643                                visit_index_store_entry(key, value, &mut visit)?
644                            }
645                        };
646                        Ok(if should_stop {
647                            OrderedOverlayVisit::Stop
648                        } else {
649                            OrderedOverlayVisit::Continue
650                        })
651                    },
652                )?;
653            }
654            Direction::Desc => {
655                visit_ordered_overlay(
656                    canonical.range((lower.clone(), upper.clone())).rev(),
657                    live.range((lower, upper)).rev(),
658                    direction,
659                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
660                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
661                    |live_entry| !tombstones.contains(live_entry.0),
662                    |entry| {
663                        let should_stop = match entry {
664                            OrderedOverlayEntry::Canonical(canonical_entry) => {
665                                visit_index_store_entry(
666                                    canonical_entry.key(),
667                                    &canonical_entry.value(),
668                                    &mut visit,
669                                )?
670                            }
671                            OrderedOverlayEntry::Live((key, value)) => {
672                                visit_index_store_entry(key, value, &mut visit)?
673                            }
674                        };
675                        Ok(if should_stop {
676                            OrderedOverlayVisit::Stop
677                        } else {
678                            OrderedOverlayVisit::Continue
679                        })
680                    },
681                )?;
682            }
683        }
684
685        Ok(())
686    }
687}
688
689#[cfg(test)]
690mod tests {
691    use super::*;
692    use crate::{
693        db::{
694            direction::Direction,
695            index::{IndexId, IndexKey, IndexKeyKind},
696            key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
697        },
698        testing::test_memory,
699        traits::Storable,
700        types::EntityTag,
701    };
702    use std::{borrow::Cow, convert::Infallible};
703
704    fn raw_key(value: u8) -> RawIndexStoreKey {
705        <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
706    }
707
708    fn indexed_raw_key(
709        index_id: &IndexId,
710        components: Vec<Vec<u8>>,
711        primary_key: u64,
712    ) -> RawIndexStoreKey {
713        indexed_raw_key_with_kind(index_id, IndexKeyKind::User, components, primary_key)
714    }
715
716    fn indexed_raw_key_with_kind(
717        index_id: &IndexId,
718        key_kind: IndexKeyKind,
719        components: Vec<Vec<u8>>,
720        primary_key: u64,
721    ) -> RawIndexStoreKey {
722        IndexKey::new_from_components_with_primary_key_value(
723            index_id,
724            key_kind,
725            components.as_slice(),
726            &PrimaryKeyValue::from(PrimaryKeyComponent::Nat64(primary_key)),
727        )
728        .expect("test index key should build")
729        .to_raw()
730        .expect("test index key should encode")
731    }
732
733    fn malformed_index_entry_value() -> IndexEntryValue {
734        <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![0xFF]))
735    }
736
737    fn missing_index_entry_value() -> IndexEntryValue {
738        <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![1]))
739    }
740
741    #[test]
742    fn index_prefix_cardinality_requires_explicit_data_generation_sync() {
743        let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
744        let collection = b"collection-a".to_vec();
745        let draft = b"Draft".to_vec();
746        let review = b"Review".to_vec();
747        let mut store = IndexStore::init_heap();
748
749        store.insert(
750            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
751            IndexEntryValue::presence(),
752        );
753        store.insert(
754            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2),
755            IndexEntryValue::presence(),
756        );
757        store.insert(
758            indexed_raw_key(&index_id, vec![collection.clone(), review.clone()], 3),
759            IndexEntryValue::presence(),
760        );
761
762        assert_eq!(
763            store.exact_prefix_cardinality(
764                0,
765                IndexKeyKind::User,
766                index_id,
767                std::slice::from_ref(&collection),
768            ),
769            None,
770            "raw index mutations must not be trusted until row generation sync is stamped",
771        );
772
773        store.mark_prefix_cardinality_data_generation(7);
774
775        assert_eq!(
776            store.exact_prefix_cardinality(
777                7,
778                IndexKeyKind::User,
779                index_id,
780                std::slice::from_ref(&collection),
781            ),
782            Some(3),
783        );
784        assert_eq!(
785            store.exact_prefix_cardinality(
786                7,
787                IndexKeyKind::User,
788                index_id,
789                &[collection.clone(), draft],
790            ),
791            Some(2),
792        );
793        assert_eq!(
794            store.exact_prefix_cardinality(8, IndexKeyKind::User, index_id, &[collection, review],),
795            None,
796            "row generation drift should force the caller to use the existing-row fallback",
797        );
798    }
799
800    #[test]
801    fn index_prefix_cardinality_enumerates_bounded_child_prefixes() {
802        let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
803        let collection = b"collection-a".to_vec();
804        let other_collection = b"collection-b".to_vec();
805        let draft = b"Draft".to_vec();
806        let review = b"Review".to_vec();
807        let published = b"Published".to_vec();
808        let mut store = IndexStore::init_heap();
809
810        store.insert(
811            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
812            IndexEntryValue::presence(),
813        );
814        store.insert(
815            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2),
816            IndexEntryValue::presence(),
817        );
818        store.insert(
819            indexed_raw_key(&index_id, vec![collection.clone(), review.clone()], 3),
820            IndexEntryValue::presence(),
821        );
822        store.insert(
823            indexed_raw_key(
824                &index_id,
825                vec![other_collection.clone(), published.clone()],
826                4,
827            ),
828            IndexEntryValue::presence(),
829        );
830        store.mark_prefix_cardinality_data_generation(7);
831
832        assert_eq!(
833            store.exact_child_prefixes(
834                7,
835                IndexKeyKind::User,
836                index_id,
837                std::slice::from_ref(&collection),
838                4,
839            ),
840            Some(vec![
841                vec![collection.clone(), draft],
842                vec![collection.clone(), review],
843            ]),
844            "child-prefix enumeration should return deterministic unique children under the requested parent",
845        );
846        assert_eq!(
847            store.exact_child_prefixes(
848                7,
849                IndexKeyKind::User,
850                index_id,
851                std::slice::from_ref(&other_collection),
852                4,
853            ),
854            Some(vec![vec![other_collection, published]]),
855            "child-prefix enumeration must stay scoped to the requested parent prefix",
856        );
857        assert_eq!(
858            store.exact_child_prefixes(
859                8,
860                IndexKeyKind::User,
861                index_id,
862                std::slice::from_ref(&collection),
863                4,
864            ),
865            None,
866            "row generation drift should keep child-prefix expansion fail-closed",
867        );
868        assert_eq!(
869            store.exact_child_prefixes(
870                7,
871                IndexKeyKind::User,
872                index_id,
873                std::slice::from_ref(&collection),
874                1,
875            ),
876            None,
877            "over-cap child-prefix expansion should fall back to the existing route",
878        );
879    }
880
881    #[test]
882    fn index_prefix_cardinality_ignores_system_index_mutations() {
883        let user_index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
884        let system_index_id = IndexId::new(EntityTag::new(0xCA7D), 2);
885        let collection = b"collection-a".to_vec();
886        let draft = b"Draft".to_vec();
887        let system_component = b"reverse-edge".to_vec();
888        let mut store = IndexStore::init_heap();
889
890        store.insert(
891            indexed_raw_key(&user_index_id, vec![collection.clone(), draft.clone()], 1),
892            IndexEntryValue::presence(),
893        );
894        store.mark_prefix_cardinality_data_generation(7);
895
896        assert_eq!(
897            store.exact_prefix_cardinality(
898                7,
899                IndexKeyKind::User,
900                user_index_id,
901                &[collection.clone(), draft.clone()],
902            ),
903            Some(1),
904        );
905
906        let system_key = indexed_raw_key_with_kind(
907            &system_index_id,
908            IndexKeyKind::System,
909            vec![system_component],
910            1,
911        );
912        store.insert(system_key.clone(), IndexEntryValue::presence());
913        assert_eq!(
914            store.exact_prefix_cardinality(
915                7,
916                IndexKeyKind::User,
917                user_index_id,
918                &[collection.clone(), draft.clone()],
919            ),
920            Some(1),
921            "system index writes must not invalidate synchronized user-prefix cardinality",
922        );
923
924        store.remove(&system_key);
925        assert_eq!(
926            store.exact_prefix_cardinality(
927                7,
928                IndexKeyKind::User,
929                user_index_id,
930                &[collection.clone(), draft.clone()],
931            ),
932            Some(1),
933            "system index removals must not invalidate synchronized user-prefix cardinality",
934        );
935
936        let malformed_system_key = indexed_raw_key_with_kind(
937            &system_index_id,
938            IndexKeyKind::System,
939            vec![b"malformed-reverse-edge".to_vec()],
940            2,
941        );
942        store.insert(malformed_system_key.clone(), malformed_index_entry_value());
943        assert_eq!(
944            store.exact_prefix_cardinality(
945                7,
946                IndexKeyKind::User,
947                user_index_id,
948                &[collection.clone(), draft.clone()],
949            ),
950            Some(1),
951            "malformed system index payloads must not invalidate user-prefix cardinality",
952        );
953
954        store.remove(&malformed_system_key);
955        assert_eq!(
956            store.exact_prefix_cardinality(
957                7,
958                IndexKeyKind::User,
959                user_index_id,
960                &[collection.clone(), draft],
961            ),
962            Some(1),
963            "malformed system index removals must not invalidate user-prefix cardinality",
964        );
965
966        let review = b"Review".to_vec();
967        store.insert(
968            indexed_raw_key(&user_index_id, vec![collection.clone(), review.clone()], 2),
969            IndexEntryValue::presence(),
970        );
971        assert_eq!(
972            store.exact_prefix_cardinality(
973                7,
974                IndexKeyKind::User,
975                user_index_id,
976                &[collection, review]
977            ),
978            None,
979            "user-prefix count changes must still require a fresh row-generation stamp",
980        );
981    }
982
983    #[test]
984    fn index_prefix_cardinality_ignores_missing_user_index_mutations() {
985        let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
986        let collection = b"collection-a".to_vec();
987        let draft = b"Draft".to_vec();
988        let mut store = IndexStore::init_heap();
989
990        store.insert(
991            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
992            IndexEntryValue::presence(),
993        );
994        store.mark_prefix_cardinality_data_generation(7);
995
996        let stale_key = indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2);
997        store.insert(stale_key.clone(), missing_index_entry_value());
998        assert_eq!(
999            store.exact_prefix_cardinality(
1000                7,
1001                IndexKeyKind::User,
1002                index_id,
1003                &[collection.clone(), draft.clone()],
1004            ),
1005            Some(1),
1006            "missing user index entries must not affect synchronized prefix cardinality",
1007        );
1008
1009        store.remove(&stale_key);
1010        assert_eq!(
1011            store.exact_prefix_cardinality(7, IndexKeyKind::User, index_id, &[collection, draft],),
1012            Some(1),
1013            "missing user index removals must not affect synchronized prefix cardinality",
1014        );
1015    }
1016
1017    #[cfg(feature = "diagnostics")]
1018    #[test]
1019    fn index_store_diagnostic_counters_record_gets_range_scans_and_entry_reads() {
1020        let mut store = IndexStore::init_heap();
1021        store.insert(raw_key(7), IndexEntryValue::presence());
1022        store.insert(raw_key(9), IndexEntryValue::presence());
1023
1024        let gets_before = IndexStore::current_get_call_count();
1025        assert_eq!(store.get(&raw_key(7)), Some(IndexEntryValue::presence()));
1026        assert_eq!(store.get(&raw_key(8)), None);
1027
1028        assert_eq!(
1029            IndexStore::current_get_call_count().saturating_sub(gets_before),
1030            2,
1031            "diagnostic index-store get counter should count both hit and miss reads",
1032        );
1033
1034        let range_scans_before = IndexStore::current_range_scan_call_count();
1035        let lower = Bound::Included(raw_key(7));
1036        let upper = Bound::Included(raw_key(9));
1037        store
1038            .visit_raw_entries_in_range((&lower, &upper), Direction::Asc, |_key, _entry| Ok(false))
1039            .expect("raw index range visit should succeed");
1040
1041        assert_eq!(
1042            IndexStore::current_range_scan_call_count().saturating_sub(range_scans_before),
1043            1,
1044            "diagnostic index-store range-scan counter should count one range traversal probe",
1045        );
1046
1047        let entries_before = IndexStore::current_entry_read_count();
1048        store
1049            .visit_entries(|_key, _entry| Ok::<_, Infallible>(IndexStoreVisit::Continue))
1050            .expect("index entry visit should succeed");
1051
1052        assert_eq!(
1053            IndexStore::current_entry_read_count().saturating_sub(entries_before),
1054            2,
1055            "diagnostic index-store entry counter should count yielded traversal entries",
1056        );
1057    }
1058
1059    #[test]
1060    fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
1061        let mut store = IndexStore::init_journaled(test_memory(93));
1062        for value in [1_u8, 3, 5] {
1063            store.insert(raw_key(value), IndexEntryValue::presence());
1064        }
1065        store
1066            .fold_journaled_materialized_view()
1067            .expect("canonical index seed should fold");
1068
1069        store.insert(raw_key(0), IndexEntryValue::presence());
1070        store.insert(raw_key(4), IndexEntryValue::presence());
1071        store.insert(raw_key(5), IndexEntryValue::presence());
1072        store.remove(&raw_key(1));
1073
1074        let lower = Bound::Included(raw_key(0));
1075        let upper = Bound::Included(raw_key(5));
1076
1077        reset_journaled_snapshot_call_count_for_tests();
1078        let mut asc = Vec::new();
1079        store
1080            .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
1081                asc.push(key.as_bytes()[0]);
1082                Ok::<_, Infallible>(asc.len() == 2)
1083            })
1084            .expect("asc journaled index range traversal should succeed");
1085        assert_eq!(asc, vec![0, 3]);
1086        assert_eq!(
1087            journaled_snapshot_call_count_for_tests(),
1088            0,
1089            "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
1090        );
1091
1092        reset_journaled_snapshot_call_count_for_tests();
1093        let mut desc = Vec::new();
1094        store
1095            .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
1096                desc.push(key.as_bytes()[0]);
1097                Ok::<_, Infallible>(desc.len() == 2)
1098            })
1099            .expect("desc journaled index range traversal should succeed");
1100        assert_eq!(desc, vec![5, 4]);
1101        assert_eq!(
1102            journaled_snapshot_call_count_for_tests(),
1103            0,
1104            "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
1105        );
1106    }
1107}