Skip to main content

icydb_core/db/index/
store.rs

1//! Module: index::store
2//! Responsibility: journaled-or-heap index-entry storage behind the index-store boundary.
3//! Does not own: range-scan resolution, continuation semantics, or predicate execution.
4//! Boundary: scan/executor layers depend on this storage boundary.
5
6use crate::db::{
7    direction::Direction,
8    index::{
9        IndexEntryValue, IndexId, IndexKeyKind, cardinality::IndexPrefixCardinality,
10        key::RawIndexStoreKey,
11    },
12    ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
13};
14
15use candid::CandidType;
16use ic_memory::stable_structures::{
17    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
18};
19use serde::Deserialize;
20#[cfg(any(test, feature = "diagnostics"))]
21use std::cell::Cell;
22use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
23use std::ops::Bound;
24
25#[cfg(test)]
26thread_local! {
27    static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
28}
29
30#[cfg(feature = "diagnostics")]
31thread_local! {
32    static INDEX_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
33}
34
35#[cfg(any(test, feature = "diagnostics"))]
36thread_local! {
37    static INDEX_STORE_RANGE_SCAN_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
38    static INDEX_STORE_ENTRY_READ_COUNT: Cell<u64> = const { Cell::new(0) };
39    static INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT: Cell<u64> = const { Cell::new(0) };
40}
41
42#[cfg(feature = "diagnostics")]
43fn record_index_store_get_call() {
44    INDEX_STORE_GET_CALL_COUNT.with(|count| {
45        count.set(count.get().saturating_add(1));
46    });
47}
48
49#[cfg(any(test, feature = "diagnostics"))]
50fn record_index_store_range_scan_call() {
51    INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(|count| {
52        count.set(count.get().saturating_add(1));
53    });
54}
55
56#[cfg(any(test, feature = "diagnostics"))]
57fn record_index_store_entry_read() {
58    INDEX_STORE_ENTRY_READ_COUNT.with(|count| {
59        count.set(count.get().saturating_add(1));
60    });
61}
62
63#[cfg(any(test, feature = "diagnostics"))]
64fn record_index_store_prefix_cardinality_lookup() {
65    INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT.with(|count| {
66        count.set(count.get().saturating_add(1));
67    });
68}
69
70fn visit_index_store_entry<E>(
71    key: &RawIndexStoreKey,
72    value: &IndexEntryValue,
73    visit: &mut impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
74) -> Result<bool, E> {
75    #[cfg(any(test, feature = "diagnostics"))]
76    record_index_store_entry_read();
77
78    visit(key, value)
79}
80
81#[cfg(test)]
82fn record_journaled_snapshot_call() {
83    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
84        count.set(count.get().saturating_add(1));
85    });
86}
87
88#[cfg(test)]
89fn reset_journaled_snapshot_call_count_for_tests() {
90    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
91}
92
93#[cfg(test)]
94fn journaled_snapshot_call_count_for_tests() -> u64 {
95    JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
96}
97
98//
99// IndexState
100//
101// Explicit lifecycle visibility state for one index store.
102// Visibility matters because planner-visible indexes must already be complete:
103// the index contents are fully built and query-visible for reads.
104//
105#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
106pub enum IndexState {
107    Building,
108    #[default]
109    Ready,
110    Dropping,
111}
112
113impl IndexState {
114    /// Return the stable lowercase text label for this lifecycle state.
115    #[must_use]
116    pub const fn as_str(self) -> &'static str {
117        match self {
118            Self::Building => "building",
119            Self::Ready => "ready",
120            Self::Dropping => "dropping",
121        }
122    }
123}
124
125///
126/// IndexStore
127///
128/// Thin persistence wrapper over one journaled or heap BTreeMap.
129///
130/// Invariant: callers provide already-validated `RawIndexStoreKey`/`IndexEntryValue`.
131///
132
133pub struct IndexStore {
134    pub(super) backend: IndexStoreBackend,
135    generation: u64,
136    state: IndexState,
137    prefix_cardinality: IndexPrefixCardinality,
138}
139
140pub(super) enum IndexStoreBackend {
141    Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
142    Journaled {
143        canonical:
144            StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
145        live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
146        tombstones: BTreeSet<RawIndexStoreKey>,
147    },
148}
149
150/// Control-flow result for index-store traversal visitors.
151#[derive(Clone, Copy, Debug, Eq, PartialEq)]
152pub(in crate::db) enum IndexStoreVisit {
153    Continue,
154    Stop,
155}
156
157impl IndexStoreVisit {
158    const fn should_stop(self) -> bool {
159        matches!(self, Self::Stop)
160    }
161}
162
163impl IndexStore {
164    /// Initialize a volatile heap-backed index store.
165    #[must_use]
166    pub const fn init_heap() -> Self {
167        Self {
168            backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
169            generation: 0,
170            state: IndexState::Ready,
171            prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
172        }
173    }
174
175    /// Initialize a journaled cached-stable index store.
176    ///
177    /// Normal writes update only the live materialized projection. The
178    /// canonical stable index is updated by future fold/rebuild paths.
179    #[must_use]
180    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
181        let mut store = Self {
182            backend: IndexStoreBackend::Journaled {
183                canonical: StableBTreeMap::init(memory),
184                live: HeapBTreeMap::new(),
185                tombstones: BTreeSet::new(),
186            },
187            generation: 0,
188            state: IndexState::Ready,
189            prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
190        };
191        store.rebuild_prefix_cardinality_from_entries(Some(0));
192        store
193    }
194
195    /// Visit all index entries in canonical store order without exposing the
196    /// backing stable-map iterator.
197    pub(in crate::db) fn visit_entries<E>(
198        &self,
199        mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
200    ) -> Result<(), E> {
201        match &self.backend {
202            IndexStoreBackend::Heap(map) => {
203                for (key, value) in map {
204                    #[cfg(any(test, feature = "diagnostics"))]
205                    record_index_store_entry_read();
206
207                    if visitor(key, value)?.should_stop() {
208                        return Ok(());
209                    }
210                }
211            }
212            IndexStoreBackend::Journaled {
213                canonical: _,
214                live: _,
215                tombstones: _,
216            } => self.visit_journaled_entries_in_range(
217                (&Bound::Unbounded, &Bound::Unbounded),
218                Direction::Asc,
219                |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
220            )?,
221        }
222
223        Ok(())
224    }
225
226    pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
227        #[cfg(feature = "diagnostics")]
228        record_index_store_get_call();
229
230        match &self.backend {
231            IndexStoreBackend::Heap(map) => map.get(key).cloned(),
232            IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
233        }
234    }
235
236    pub fn len(&self) -> u64 {
237        match &self.backend {
238            IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
239            IndexStoreBackend::Journaled { .. } => {
240                let mut count = 0_u64;
241                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
242                    count = count.saturating_add(1);
243                    Ok(IndexStoreVisit::Continue)
244                });
245                count
246            }
247        }
248    }
249
250    pub fn is_empty(&self) -> bool {
251        match &self.backend {
252            IndexStoreBackend::Heap(map) => map.is_empty(),
253            IndexStoreBackend::Journaled { .. } => {
254                let mut empty = true;
255                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
256                    empty = false;
257                    Ok(IndexStoreVisit::Stop)
258                });
259                empty
260            }
261        }
262    }
263
264    #[must_use]
265    pub(in crate::db) const fn generation(&self) -> u64 {
266        self.generation
267    }
268
269    /// Return the explicit lifecycle state for this index store.
270    #[must_use]
271    pub(in crate::db) const fn state(&self) -> IndexState {
272        self.state
273    }
274
275    /// Return an exact user-index prefix count when the index metadata is
276    /// synchronized with the caller's authoritative row-store generation.
277    #[must_use]
278    pub(in crate::db) fn exact_prefix_cardinality(
279        &self,
280        data_generation: u64,
281        key_kind: IndexKeyKind,
282        index_id: IndexId,
283        components: &[Vec<u8>],
284    ) -> Option<u64> {
285        #[cfg(any(test, feature = "diagnostics"))]
286        record_index_store_prefix_cardinality_lookup();
287
288        self.prefix_cardinality
289            .exact_count(data_generation, key_kind, index_id, components)
290    }
291
292    /// Return the sum of exact prefix counts for prefixes on the same index
293    /// when synchronized metadata can prove all requested counts.
294    #[must_use]
295    pub(in crate::db) fn exact_prefix_cardinality_sum<'a>(
296        &self,
297        data_generation: u64,
298        key_kind: IndexKeyKind,
299        index_id: IndexId,
300        component_prefixes: impl IntoIterator<Item = &'a [Vec<u8>]>,
301        stop_after: Option<u64>,
302    ) -> Option<u64> {
303        #[cfg(any(test, feature = "diagnostics"))]
304        record_index_store_prefix_cardinality_lookup();
305
306        self.prefix_cardinality.exact_count_sum(
307            data_generation,
308            key_kind,
309            index_id,
310            component_prefixes,
311            stop_after,
312        )
313    }
314
315    /// Mark prefix-cardinality metadata synchronized with the authoritative
316    /// row-store generation after a committed row/index transition.
317    pub(in crate::db) const fn mark_prefix_cardinality_data_generation(&mut self, generation: u64) {
318        self.prefix_cardinality.mark_synchronized(generation);
319    }
320
321    /// Mark this index store as in-progress and therefore ineligible for
322    /// planner visibility until a full authoritative rebuild ends.
323    pub(in crate::db) const fn mark_building(&mut self) {
324        self.state = IndexState::Building;
325    }
326
327    /// Mark this index store as fully built and planner-visible again.
328    pub(in crate::db) const fn mark_ready(&mut self) {
329        self.state = IndexState::Ready;
330    }
331
332    /// Mark this index store as dropping and therefore not planner-visible.
333    pub(in crate::db) const fn mark_dropping(&mut self) {
334        self.state = IndexState::Dropping;
335    }
336
337    pub(crate) fn insert(
338        &mut self,
339        key: RawIndexStoreKey,
340        entry: IndexEntryValue,
341    ) -> Option<IndexEntryValue> {
342        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
343            self.get(&key)
344        } else {
345            None
346        };
347        let cardinality_key = key.clone();
348        let previous = match &mut self.backend {
349            IndexStoreBackend::Heap(map) => map.insert(key, entry.clone()),
350            IndexStoreBackend::Journaled {
351                live, tombstones, ..
352            } => {
353                tombstones.remove(&key);
354                live.insert(key, entry.clone());
355                previous_journaled
356            }
357        };
358        self.prefix_cardinality
359            .apply_insert(&cardinality_key, previous.as_ref(), &entry);
360        self.bump_generation();
361        previous
362    }
363
364    pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
365        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
366            self.get(key)
367        } else {
368            None
369        };
370        let previous = match &mut self.backend {
371            IndexStoreBackend::Heap(map) => map.remove(key),
372            IndexStoreBackend::Journaled {
373                live, tombstones, ..
374            } => {
375                live.remove(key);
376                tombstones.insert(key.clone());
377                previous_journaled
378            }
379        };
380        self.prefix_cardinality.apply_remove(key, previous.as_ref());
381        self.bump_generation();
382        previous
383    }
384
385    pub fn clear(&mut self) {
386        match &mut self.backend {
387            IndexStoreBackend::Heap(map) => map.clear(),
388            IndexStoreBackend::Journaled {
389                canonical,
390                live,
391                tombstones,
392            } => {
393                live.clear();
394                tombstones.clear();
395                for entry in canonical.iter() {
396                    tombstones.insert(entry.key().clone());
397                }
398            }
399        }
400        self.prefix_cardinality.clear_unsynchronized();
401        self.bump_generation();
402    }
403
404    /// Fold the current journaled materialized index view into the canonical
405    /// stable base and clear volatile projection state.
406    pub(in crate::db) fn fold_journaled_materialized_view(
407        &mut self,
408    ) -> Result<(), crate::error::InternalError> {
409        let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
410        let IndexStoreBackend::Journaled {
411            canonical,
412            live,
413            tombstones,
414        } = &mut self.backend
415        else {
416            return Err(crate::error::InternalError::store_invariant());
417        };
418
419        canonical.clear_new();
420        for (key, value) in entries {
421            canonical.insert(key, value);
422        }
423        live.clear();
424        tombstones.clear();
425        let data_generation = self.prefix_cardinality.synchronized_generation();
426        self.rebuild_prefix_cardinality_from_entries(data_generation);
427        self.bump_generation();
428
429        Ok(())
430    }
431
432    /// Sum of bytes used by all stored index entries.
433    pub fn memory_bytes(&self) -> u64 {
434        let mut bytes = 0u64;
435        let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
436            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
437            Ok(IndexStoreVisit::Continue)
438        });
439        bytes
440    }
441
442    /// Return the monotonic perf-only count of index-entry fetches seen by this process.
443    #[cfg(feature = "diagnostics")]
444    pub(in crate::db) fn current_get_call_count() -> u64 {
445        INDEX_STORE_GET_CALL_COUNT.with(Cell::get)
446    }
447
448    /// Return the monotonic perf-only count of index range traversal probes seen by this process.
449    #[cfg(any(test, feature = "diagnostics"))]
450    pub(in crate::db) fn current_range_scan_call_count() -> u64 {
451        INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(Cell::get)
452    }
453
454    /// Return the monotonic perf-only count of index entries yielded by traversal.
455    #[cfg(any(test, feature = "diagnostics"))]
456    pub(in crate::db) fn current_entry_read_count() -> u64 {
457        INDEX_STORE_ENTRY_READ_COUNT.with(Cell::get)
458    }
459
460    /// Return the monotonic perf-only count of exact prefix-cardinality probes.
461    #[cfg(test)]
462    #[allow(dead_code)]
463    pub(in crate::db) fn current_prefix_cardinality_lookup_count() -> u64 {
464        INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT.with(Cell::get)
465    }
466
467    #[cfg(any(test, feature = "diagnostics"))]
468    pub(in crate::db::index) fn record_range_scan_call() {
469        record_index_store_range_scan_call();
470    }
471
472    const fn bump_generation(&mut self) {
473        self.generation = self.generation.saturating_add(1);
474    }
475
476    fn rebuild_prefix_cardinality_from_entries(&mut self, data_generation: Option<u64>) {
477        self.prefix_cardinality.clear_unsynchronized();
478        let entries = Self::entries_snapshot_for_cardinality(&self.backend);
479        for (key, value) in &entries {
480            self.prefix_cardinality.apply_insert(key, None, value);
481        }
482        if let Some(data_generation) = data_generation {
483            self.prefix_cardinality.mark_synchronized(data_generation);
484        }
485    }
486
487    fn entries_snapshot_for_cardinality(
488        backend: &IndexStoreBackend,
489    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
490        match backend {
491            IndexStoreBackend::Heap(map) => map.clone(),
492            IndexStoreBackend::Journaled { .. } => {
493                Self::journaled_entries_snapshot_for_fold(backend)
494            }
495        }
496    }
497
498    #[cfg(test)]
499    #[must_use]
500    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
501        match &self.backend {
502            IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
503            IndexStoreBackend::Heap(_) => 0,
504        }
505    }
506
507    fn journaled_get(
508        backend: &IndexStoreBackend,
509        key: &RawIndexStoreKey,
510    ) -> Option<IndexEntryValue> {
511        let IndexStoreBackend::Journaled {
512            canonical,
513            live,
514            tombstones,
515        } = backend
516        else {
517            return None;
518        };
519
520        if tombstones.contains(key) {
521            return None;
522        }
523        live.get(key).cloned().or_else(|| canonical.get(key))
524    }
525
526    pub(super) fn journaled_entries_snapshot_for_fold(
527        backend: &IndexStoreBackend,
528    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
529        #[cfg(test)]
530        record_journaled_snapshot_call();
531
532        let IndexStoreBackend::Journaled {
533            canonical,
534            live,
535            tombstones,
536        } = backend
537        else {
538            return HeapBTreeMap::new();
539        };
540
541        let mut entries = HeapBTreeMap::new();
542        for entry in canonical.iter() {
543            let key = entry.key().clone();
544            if !tombstones.contains(&key) {
545                entries.insert(key, entry.value());
546            }
547        }
548        for (key, value) in live {
549            if !tombstones.contains(key) {
550                entries.insert(key.clone(), value.clone());
551            }
552        }
553
554        entries
555    }
556
557    pub(super) fn visit_journaled_entries_in_range<E>(
558        &self,
559        bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
560        direction: Direction,
561        mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
562    ) -> Result<(), E> {
563        let IndexStoreBackend::Journaled {
564            canonical,
565            live,
566            tombstones,
567        } = &self.backend
568        else {
569            return Ok(());
570        };
571
572        let lower = bounds.0.clone();
573        let upper = bounds.1.clone();
574        match direction {
575            Direction::Asc if canonical.is_empty() => {
576                for (key, value) in live.range((lower, upper)) {
577                    if visit_index_store_entry(key, value, &mut visit)? {
578                        return Ok(());
579                    }
580                }
581            }
582            Direction::Desc if canonical.is_empty() => {
583                for (key, value) in live.range((lower, upper)).rev() {
584                    if visit_index_store_entry(key, value, &mut visit)? {
585                        return Ok(());
586                    }
587                }
588            }
589            Direction::Asc if live.is_empty() && tombstones.is_empty() => {
590                for entry in canonical.range((lower, upper)) {
591                    if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
592                        return Ok(());
593                    }
594                }
595            }
596            Direction::Desc if live.is_empty() && tombstones.is_empty() => {
597                for entry in canonical.range((lower, upper)).rev() {
598                    if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
599                        return Ok(());
600                    }
601                }
602            }
603            Direction::Asc => {
604                visit_ordered_overlay(
605                    canonical.range((lower.clone(), upper.clone())),
606                    live.range((lower, upper)),
607                    direction,
608                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
609                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
610                    |live_entry| !tombstones.contains(live_entry.0),
611                    |entry| {
612                        let should_stop = match entry {
613                            OrderedOverlayEntry::Canonical(canonical_entry) => {
614                                visit_index_store_entry(
615                                    canonical_entry.key(),
616                                    &canonical_entry.value(),
617                                    &mut visit,
618                                )?
619                            }
620                            OrderedOverlayEntry::Live((key, value)) => {
621                                visit_index_store_entry(key, value, &mut visit)?
622                            }
623                        };
624                        Ok(if should_stop {
625                            OrderedOverlayVisit::Stop
626                        } else {
627                            OrderedOverlayVisit::Continue
628                        })
629                    },
630                )?;
631            }
632            Direction::Desc => {
633                visit_ordered_overlay(
634                    canonical.range((lower.clone(), upper.clone())).rev(),
635                    live.range((lower, upper)).rev(),
636                    direction,
637                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
638                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
639                    |live_entry| !tombstones.contains(live_entry.0),
640                    |entry| {
641                        let should_stop = match entry {
642                            OrderedOverlayEntry::Canonical(canonical_entry) => {
643                                visit_index_store_entry(
644                                    canonical_entry.key(),
645                                    &canonical_entry.value(),
646                                    &mut visit,
647                                )?
648                            }
649                            OrderedOverlayEntry::Live((key, value)) => {
650                                visit_index_store_entry(key, value, &mut visit)?
651                            }
652                        };
653                        Ok(if should_stop {
654                            OrderedOverlayVisit::Stop
655                        } else {
656                            OrderedOverlayVisit::Continue
657                        })
658                    },
659                )?;
660            }
661        }
662
663        Ok(())
664    }
665}
666
667#[cfg(test)]
668mod tests {
669    use super::*;
670    use crate::{
671        db::{
672            direction::Direction,
673            index::{IndexId, IndexKey, IndexKeyKind},
674            key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
675        },
676        testing::test_memory,
677        traits::Storable,
678        types::EntityTag,
679    };
680    use std::{borrow::Cow, convert::Infallible};
681
682    fn raw_key(value: u8) -> RawIndexStoreKey {
683        <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
684    }
685
686    fn indexed_raw_key(
687        index_id: &IndexId,
688        components: Vec<Vec<u8>>,
689        primary_key: u64,
690    ) -> RawIndexStoreKey {
691        indexed_raw_key_with_kind(index_id, IndexKeyKind::User, components, primary_key)
692    }
693
694    fn indexed_raw_key_with_kind(
695        index_id: &IndexId,
696        key_kind: IndexKeyKind,
697        components: Vec<Vec<u8>>,
698        primary_key: u64,
699    ) -> RawIndexStoreKey {
700        IndexKey::new_from_components_with_primary_key_value(
701            index_id,
702            key_kind,
703            components.as_slice(),
704            &PrimaryKeyValue::from(PrimaryKeyComponent::Nat64(primary_key)),
705        )
706        .to_raw()
707    }
708
709    fn malformed_index_entry_value() -> IndexEntryValue {
710        <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![0xFF]))
711    }
712
713    fn missing_index_entry_value() -> IndexEntryValue {
714        <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![1]))
715    }
716
717    #[test]
718    fn index_prefix_cardinality_requires_explicit_data_generation_sync() {
719        let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
720        let collection = b"collection-a".to_vec();
721        let draft = b"Draft".to_vec();
722        let review = b"Review".to_vec();
723        let mut store = IndexStore::init_heap();
724
725        store.insert(
726            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
727            IndexEntryValue::presence(),
728        );
729        store.insert(
730            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2),
731            IndexEntryValue::presence(),
732        );
733        store.insert(
734            indexed_raw_key(&index_id, vec![collection.clone(), review.clone()], 3),
735            IndexEntryValue::presence(),
736        );
737
738        assert_eq!(
739            store.exact_prefix_cardinality(
740                0,
741                IndexKeyKind::User,
742                index_id,
743                std::slice::from_ref(&collection),
744            ),
745            None,
746            "raw index mutations must not be trusted until row generation sync is stamped",
747        );
748
749        store.mark_prefix_cardinality_data_generation(7);
750
751        assert_eq!(
752            store.exact_prefix_cardinality(
753                7,
754                IndexKeyKind::User,
755                index_id,
756                std::slice::from_ref(&collection),
757            ),
758            Some(3),
759        );
760        assert_eq!(
761            store.exact_prefix_cardinality(
762                7,
763                IndexKeyKind::User,
764                index_id,
765                &[collection.clone(), draft],
766            ),
767            Some(2),
768        );
769        assert_eq!(
770            store.exact_prefix_cardinality(8, IndexKeyKind::User, index_id, &[collection, review],),
771            None,
772            "row generation drift should force the caller to use the existing-row fallback",
773        );
774    }
775
776    #[test]
777    fn index_prefix_cardinality_ignores_system_index_mutations() {
778        let user_index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
779        let system_index_id = IndexId::new(EntityTag::new(0xCA7D), 2);
780        let collection = b"collection-a".to_vec();
781        let draft = b"Draft".to_vec();
782        let system_component = b"reverse-edge".to_vec();
783        let mut store = IndexStore::init_heap();
784
785        store.insert(
786            indexed_raw_key(&user_index_id, vec![collection.clone(), draft.clone()], 1),
787            IndexEntryValue::presence(),
788        );
789        store.mark_prefix_cardinality_data_generation(7);
790
791        assert_eq!(
792            store.exact_prefix_cardinality(
793                7,
794                IndexKeyKind::User,
795                user_index_id,
796                &[collection.clone(), draft.clone()],
797            ),
798            Some(1),
799        );
800
801        let system_key = indexed_raw_key_with_kind(
802            &system_index_id,
803            IndexKeyKind::System,
804            vec![system_component],
805            1,
806        );
807        store.insert(system_key.clone(), IndexEntryValue::presence());
808        assert_eq!(
809            store.exact_prefix_cardinality(
810                7,
811                IndexKeyKind::User,
812                user_index_id,
813                &[collection.clone(), draft.clone()],
814            ),
815            Some(1),
816            "system index writes must not invalidate synchronized user-prefix cardinality",
817        );
818
819        store.remove(&system_key);
820        assert_eq!(
821            store.exact_prefix_cardinality(
822                7,
823                IndexKeyKind::User,
824                user_index_id,
825                &[collection.clone(), draft.clone()],
826            ),
827            Some(1),
828            "system index removals must not invalidate synchronized user-prefix cardinality",
829        );
830
831        let malformed_system_key = indexed_raw_key_with_kind(
832            &system_index_id,
833            IndexKeyKind::System,
834            vec![b"malformed-reverse-edge".to_vec()],
835            2,
836        );
837        store.insert(malformed_system_key.clone(), malformed_index_entry_value());
838        assert_eq!(
839            store.exact_prefix_cardinality(
840                7,
841                IndexKeyKind::User,
842                user_index_id,
843                &[collection.clone(), draft.clone()],
844            ),
845            Some(1),
846            "malformed system index payloads must not invalidate user-prefix cardinality",
847        );
848
849        store.remove(&malformed_system_key);
850        assert_eq!(
851            store.exact_prefix_cardinality(
852                7,
853                IndexKeyKind::User,
854                user_index_id,
855                &[collection.clone(), draft],
856            ),
857            Some(1),
858            "malformed system index removals must not invalidate user-prefix cardinality",
859        );
860
861        let review = b"Review".to_vec();
862        store.insert(
863            indexed_raw_key(&user_index_id, vec![collection.clone(), review.clone()], 2),
864            IndexEntryValue::presence(),
865        );
866        assert_eq!(
867            store.exact_prefix_cardinality(
868                7,
869                IndexKeyKind::User,
870                user_index_id,
871                &[collection, review]
872            ),
873            None,
874            "user-prefix count changes must still require a fresh row-generation stamp",
875        );
876    }
877
878    #[test]
879    fn index_prefix_cardinality_ignores_missing_user_index_mutations() {
880        let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
881        let collection = b"collection-a".to_vec();
882        let draft = b"Draft".to_vec();
883        let mut store = IndexStore::init_heap();
884
885        store.insert(
886            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
887            IndexEntryValue::presence(),
888        );
889        store.mark_prefix_cardinality_data_generation(7);
890
891        let stale_key = indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2);
892        store.insert(stale_key.clone(), missing_index_entry_value());
893        assert_eq!(
894            store.exact_prefix_cardinality(
895                7,
896                IndexKeyKind::User,
897                index_id,
898                &[collection.clone(), draft.clone()],
899            ),
900            Some(1),
901            "missing user index entries must not affect synchronized prefix cardinality",
902        );
903
904        store.remove(&stale_key);
905        assert_eq!(
906            store.exact_prefix_cardinality(7, IndexKeyKind::User, index_id, &[collection, draft],),
907            Some(1),
908            "missing user index removals must not affect synchronized prefix cardinality",
909        );
910    }
911
912    #[cfg(feature = "diagnostics")]
913    #[test]
914    fn index_store_diagnostic_counters_record_gets_range_scans_and_entry_reads() {
915        let mut store = IndexStore::init_heap();
916        store.insert(raw_key(7), IndexEntryValue::presence());
917        store.insert(raw_key(9), IndexEntryValue::presence());
918
919        let gets_before = IndexStore::current_get_call_count();
920        assert_eq!(store.get(&raw_key(7)), Some(IndexEntryValue::presence()));
921        assert_eq!(store.get(&raw_key(8)), None);
922
923        assert_eq!(
924            IndexStore::current_get_call_count().saturating_sub(gets_before),
925            2,
926            "diagnostic index-store get counter should count both hit and miss reads",
927        );
928
929        let range_scans_before = IndexStore::current_range_scan_call_count();
930        let lower = Bound::Included(raw_key(7));
931        let upper = Bound::Included(raw_key(9));
932        store
933            .visit_raw_entries_in_range((&lower, &upper), Direction::Asc, |_key, _entry| Ok(false))
934            .expect("raw index range visit should succeed");
935
936        assert_eq!(
937            IndexStore::current_range_scan_call_count().saturating_sub(range_scans_before),
938            1,
939            "diagnostic index-store range-scan counter should count one range traversal probe",
940        );
941
942        let entries_before = IndexStore::current_entry_read_count();
943        store
944            .visit_entries(|_key, _entry| Ok::<_, Infallible>(IndexStoreVisit::Continue))
945            .expect("index entry visit should succeed");
946
947        assert_eq!(
948            IndexStore::current_entry_read_count().saturating_sub(entries_before),
949            2,
950            "diagnostic index-store entry counter should count yielded traversal entries",
951        );
952    }
953
954    #[test]
955    fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
956        let mut store = IndexStore::init_journaled(test_memory(93));
957        for value in [1_u8, 3, 5] {
958            store.insert(raw_key(value), IndexEntryValue::presence());
959        }
960        store
961            .fold_journaled_materialized_view()
962            .expect("canonical index seed should fold");
963
964        store.insert(raw_key(0), IndexEntryValue::presence());
965        store.insert(raw_key(4), IndexEntryValue::presence());
966        store.insert(raw_key(5), IndexEntryValue::presence());
967        store.remove(&raw_key(1));
968
969        let lower = Bound::Included(raw_key(0));
970        let upper = Bound::Included(raw_key(5));
971
972        reset_journaled_snapshot_call_count_for_tests();
973        let mut asc = Vec::new();
974        store
975            .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
976                asc.push(key.as_bytes()[0]);
977                Ok::<_, Infallible>(asc.len() == 2)
978            })
979            .expect("asc journaled index range traversal should succeed");
980        assert_eq!(asc, vec![0, 3]);
981        assert_eq!(
982            journaled_snapshot_call_count_for_tests(),
983            0,
984            "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
985        );
986
987        reset_journaled_snapshot_call_count_for_tests();
988        let mut desc = Vec::new();
989        store
990            .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
991                desc.push(key.as_bytes()[0]);
992                Ok::<_, Infallible>(desc.len() == 2)
993            })
994            .expect("desc journaled index range traversal should succeed");
995        assert_eq!(desc, vec![5, 4]);
996        assert_eq!(
997            journaled_snapshot_call_count_for_tests(),
998            0,
999            "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
1000        );
1001    }
1002}