Skip to main content

icydb_core/db/index/
store.rs

1//! Module: index::store
2//! Responsibility: journaled-or-heap index-entry storage behind the index-store boundary.
3//! Does not own: range-scan resolution, continuation semantics, or predicate execution.
4//! Boundary: scan/executor layers depend on this storage boundary.
5
6use crate::db::{
7    direction::Direction,
8    index::{
9        IndexEntryValue, IndexId, IndexKeyKind, cardinality::IndexPrefixCardinality,
10        key::RawIndexStoreKey,
11    },
12    ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
13};
14
15use candid::CandidType;
16use ic_memory::stable_structures::{
17    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
18};
19use serde::Deserialize;
20#[cfg(any(test, feature = "diagnostics"))]
21use std::cell::Cell;
22use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
23use std::ops::Bound;
24
25#[cfg(test)]
26thread_local! {
27    static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
28}
29
30#[cfg(feature = "diagnostics")]
31thread_local! {
32    static INDEX_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
33    static INDEX_STORE_ENTRY_READ_COUNT: Cell<u64> = const { Cell::new(0) };
34}
35
36#[cfg(feature = "diagnostics")]
37fn record_index_store_get_call() {
38    INDEX_STORE_GET_CALL_COUNT.with(|count| {
39        count.set(count.get().saturating_add(1));
40    });
41}
42
43#[cfg(feature = "diagnostics")]
44fn record_index_store_entry_read() {
45    INDEX_STORE_ENTRY_READ_COUNT.with(|count| {
46        count.set(count.get().saturating_add(1));
47    });
48}
49
50fn visit_index_store_entry<E>(
51    key: &RawIndexStoreKey,
52    value: &IndexEntryValue,
53    visit: &mut impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
54) -> Result<bool, E> {
55    #[cfg(feature = "diagnostics")]
56    record_index_store_entry_read();
57
58    visit(key, value)
59}
60
61#[cfg(test)]
62fn record_journaled_snapshot_call() {
63    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
64        count.set(count.get().saturating_add(1));
65    });
66}
67
68#[cfg(test)]
69fn reset_journaled_snapshot_call_count_for_tests() {
70    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
71}
72
73#[cfg(test)]
74fn journaled_snapshot_call_count_for_tests() -> u64 {
75    JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
76}
77
78//
79// IndexState
80//
81// Explicit lifecycle visibility state for one index store.
82// Visibility matters because planner-visible indexes must already be complete:
83// the index contents are fully built and query-visible for reads.
84//
85#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
86pub enum IndexState {
87    Building,
88    #[default]
89    Ready,
90    Dropping,
91}
92
93impl IndexState {
94    /// Return the stable lowercase text label for this lifecycle state.
95    #[must_use]
96    pub const fn as_str(self) -> &'static str {
97        match self {
98            Self::Building => "building",
99            Self::Ready => "ready",
100            Self::Dropping => "dropping",
101        }
102    }
103}
104
105///
106/// IndexStore
107///
108/// Thin persistence wrapper over one journaled or heap BTreeMap.
109///
110/// Invariant: callers provide already-validated `RawIndexStoreKey`/`IndexEntryValue`.
111///
112
113pub struct IndexStore {
114    pub(super) backend: IndexStoreBackend,
115    generation: u64,
116    state: IndexState,
117    prefix_cardinality: IndexPrefixCardinality,
118}
119
120pub(super) enum IndexStoreBackend {
121    Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
122    Journaled {
123        canonical:
124            StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
125        live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
126        tombstones: BTreeSet<RawIndexStoreKey>,
127    },
128}
129
130/// Control-flow result for index-store traversal visitors.
131#[derive(Clone, Copy, Debug, Eq, PartialEq)]
132pub(in crate::db) enum IndexStoreVisit {
133    Continue,
134    Stop,
135}
136
137impl IndexStoreVisit {
138    const fn should_stop(self) -> bool {
139        matches!(self, Self::Stop)
140    }
141}
142
143impl IndexStore {
144    /// Initialize a volatile heap-backed index store.
145    #[must_use]
146    pub const fn init_heap() -> Self {
147        Self {
148            backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
149            generation: 0,
150            state: IndexState::Ready,
151            prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
152        }
153    }
154
155    /// Initialize a journaled cached-stable index store.
156    ///
157    /// Normal writes update only the live materialized projection. The
158    /// canonical stable index is updated by future fold/rebuild paths.
159    #[must_use]
160    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
161        let mut store = Self {
162            backend: IndexStoreBackend::Journaled {
163                canonical: StableBTreeMap::init(memory),
164                live: HeapBTreeMap::new(),
165                tombstones: BTreeSet::new(),
166            },
167            generation: 0,
168            state: IndexState::Ready,
169            prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
170        };
171        store.rebuild_prefix_cardinality_from_entries(Some(0));
172        store
173    }
174
175    /// Visit all index entries in canonical store order without exposing the
176    /// backing stable-map iterator.
177    pub(in crate::db) fn visit_entries<E>(
178        &self,
179        mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
180    ) -> Result<(), E> {
181        match &self.backend {
182            IndexStoreBackend::Heap(map) => {
183                for (key, value) in map {
184                    #[cfg(feature = "diagnostics")]
185                    record_index_store_entry_read();
186
187                    if visitor(key, value)?.should_stop() {
188                        return Ok(());
189                    }
190                }
191            }
192            IndexStoreBackend::Journaled {
193                canonical: _,
194                live: _,
195                tombstones: _,
196            } => self.visit_journaled_entries_in_range(
197                (&Bound::Unbounded, &Bound::Unbounded),
198                Direction::Asc,
199                |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
200            )?,
201        }
202
203        Ok(())
204    }
205
206    pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
207        #[cfg(feature = "diagnostics")]
208        record_index_store_get_call();
209
210        match &self.backend {
211            IndexStoreBackend::Heap(map) => map.get(key).cloned(),
212            IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
213        }
214    }
215
216    pub fn len(&self) -> u64 {
217        match &self.backend {
218            IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
219            IndexStoreBackend::Journaled { .. } => {
220                let mut count = 0_u64;
221                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
222                    count = count.saturating_add(1);
223                    Ok(IndexStoreVisit::Continue)
224                });
225                count
226            }
227        }
228    }
229
230    pub fn is_empty(&self) -> bool {
231        match &self.backend {
232            IndexStoreBackend::Heap(map) => map.is_empty(),
233            IndexStoreBackend::Journaled { .. } => {
234                let mut empty = true;
235                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
236                    empty = false;
237                    Ok(IndexStoreVisit::Stop)
238                });
239                empty
240            }
241        }
242    }
243
244    #[must_use]
245    pub(in crate::db) const fn generation(&self) -> u64 {
246        self.generation
247    }
248
249    /// Return the explicit lifecycle state for this index store.
250    #[must_use]
251    pub(in crate::db) const fn state(&self) -> IndexState {
252        self.state
253    }
254
255    /// Return an exact user-index prefix count when the index metadata is
256    /// synchronized with the caller's authoritative row-store generation.
257    #[must_use]
258    pub(in crate::db) fn exact_prefix_cardinality(
259        &self,
260        data_generation: u64,
261        key_kind: IndexKeyKind,
262        index_id: IndexId,
263        components: &[Vec<u8>],
264    ) -> Option<u64> {
265        self.prefix_cardinality
266            .exact_count(data_generation, key_kind, index_id, components)
267    }
268
269    /// Mark prefix-cardinality metadata synchronized with the authoritative
270    /// row-store generation after a committed row/index transition.
271    pub(in crate::db) const fn mark_prefix_cardinality_data_generation(&mut self, generation: u64) {
272        self.prefix_cardinality.mark_synchronized(generation);
273    }
274
275    /// Mark this index store as in-progress and therefore ineligible for
276    /// planner visibility until a full authoritative rebuild ends.
277    pub(in crate::db) const fn mark_building(&mut self) {
278        self.state = IndexState::Building;
279    }
280
281    /// Mark this index store as fully built and planner-visible again.
282    pub(in crate::db) const fn mark_ready(&mut self) {
283        self.state = IndexState::Ready;
284    }
285
286    /// Mark this index store as dropping and therefore not planner-visible.
287    pub(in crate::db) const fn mark_dropping(&mut self) {
288        self.state = IndexState::Dropping;
289    }
290
291    pub(crate) fn insert(
292        &mut self,
293        key: RawIndexStoreKey,
294        entry: IndexEntryValue,
295    ) -> Option<IndexEntryValue> {
296        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
297            self.get(&key)
298        } else {
299            None
300        };
301        let cardinality_key = key.clone();
302        let previous = match &mut self.backend {
303            IndexStoreBackend::Heap(map) => map.insert(key, entry.clone()),
304            IndexStoreBackend::Journaled {
305                live, tombstones, ..
306            } => {
307                tombstones.remove(&key);
308                live.insert(key, entry.clone());
309                previous_journaled
310            }
311        };
312        self.prefix_cardinality
313            .apply_insert(&cardinality_key, previous.as_ref(), &entry);
314        self.bump_generation();
315        previous
316    }
317
318    pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
319        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
320            self.get(key)
321        } else {
322            None
323        };
324        let previous = match &mut self.backend {
325            IndexStoreBackend::Heap(map) => map.remove(key),
326            IndexStoreBackend::Journaled {
327                live, tombstones, ..
328            } => {
329                live.remove(key);
330                tombstones.insert(key.clone());
331                previous_journaled
332            }
333        };
334        self.prefix_cardinality.apply_remove(key, previous.as_ref());
335        self.bump_generation();
336        previous
337    }
338
339    pub fn clear(&mut self) {
340        match &mut self.backend {
341            IndexStoreBackend::Heap(map) => map.clear(),
342            IndexStoreBackend::Journaled {
343                canonical,
344                live,
345                tombstones,
346            } => {
347                live.clear();
348                tombstones.clear();
349                for entry in canonical.iter() {
350                    tombstones.insert(entry.key().clone());
351                }
352            }
353        }
354        self.prefix_cardinality.clear_unsynchronized();
355        self.bump_generation();
356    }
357
358    /// Fold the current journaled materialized index view into the canonical
359    /// stable base and clear volatile projection state.
360    pub(in crate::db) fn fold_journaled_materialized_view(
361        &mut self,
362    ) -> Result<(), crate::error::InternalError> {
363        let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
364        let IndexStoreBackend::Journaled {
365            canonical,
366            live,
367            tombstones,
368        } = &mut self.backend
369        else {
370            return Err(crate::error::InternalError::store_invariant());
371        };
372
373        canonical.clear_new();
374        for (key, value) in entries {
375            canonical.insert(key, value);
376        }
377        live.clear();
378        tombstones.clear();
379        let data_generation = self.prefix_cardinality.synchronized_generation();
380        self.rebuild_prefix_cardinality_from_entries(data_generation);
381        self.bump_generation();
382
383        Ok(())
384    }
385
386    /// Sum of bytes used by all stored index entries.
387    pub fn memory_bytes(&self) -> u64 {
388        let mut bytes = 0u64;
389        let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
390            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
391            Ok(IndexStoreVisit::Continue)
392        });
393        bytes
394    }
395
396    /// Return the monotonic perf-only count of index-entry fetches seen by this process.
397    #[cfg(feature = "diagnostics")]
398    pub(in crate::db) fn current_get_call_count() -> u64 {
399        INDEX_STORE_GET_CALL_COUNT.with(Cell::get)
400    }
401
402    /// Return the monotonic perf-only count of index entries yielded by traversal.
403    #[cfg(feature = "diagnostics")]
404    pub(in crate::db) fn current_entry_read_count() -> u64 {
405        INDEX_STORE_ENTRY_READ_COUNT.with(Cell::get)
406    }
407
408    const fn bump_generation(&mut self) {
409        self.generation = self.generation.saturating_add(1);
410    }
411
412    fn rebuild_prefix_cardinality_from_entries(&mut self, data_generation: Option<u64>) {
413        self.prefix_cardinality.clear_unsynchronized();
414        let entries = Self::entries_snapshot_for_cardinality(&self.backend);
415        for (key, value) in &entries {
416            self.prefix_cardinality.apply_insert(key, None, value);
417        }
418        if let Some(data_generation) = data_generation {
419            self.prefix_cardinality.mark_synchronized(data_generation);
420        }
421    }
422
423    fn entries_snapshot_for_cardinality(
424        backend: &IndexStoreBackend,
425    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
426        match backend {
427            IndexStoreBackend::Heap(map) => map.clone(),
428            IndexStoreBackend::Journaled { .. } => {
429                Self::journaled_entries_snapshot_for_fold(backend)
430            }
431        }
432    }
433
434    #[cfg(test)]
435    #[must_use]
436    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
437        match &self.backend {
438            IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
439            IndexStoreBackend::Heap(_) => 0,
440        }
441    }
442
443    fn journaled_get(
444        backend: &IndexStoreBackend,
445        key: &RawIndexStoreKey,
446    ) -> Option<IndexEntryValue> {
447        let IndexStoreBackend::Journaled {
448            canonical,
449            live,
450            tombstones,
451        } = backend
452        else {
453            return None;
454        };
455
456        if tombstones.contains(key) {
457            return None;
458        }
459        live.get(key).cloned().or_else(|| canonical.get(key))
460    }
461
462    pub(super) fn journaled_entries_snapshot_for_fold(
463        backend: &IndexStoreBackend,
464    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
465        #[cfg(test)]
466        record_journaled_snapshot_call();
467
468        let IndexStoreBackend::Journaled {
469            canonical,
470            live,
471            tombstones,
472        } = backend
473        else {
474            return HeapBTreeMap::new();
475        };
476
477        let mut entries = HeapBTreeMap::new();
478        for entry in canonical.iter() {
479            let key = entry.key().clone();
480            if !tombstones.contains(&key) {
481                entries.insert(key, entry.value());
482            }
483        }
484        for (key, value) in live {
485            if !tombstones.contains(key) {
486                entries.insert(key.clone(), value.clone());
487            }
488        }
489
490        entries
491    }
492
493    pub(super) fn visit_journaled_entries_in_range<E>(
494        &self,
495        bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
496        direction: Direction,
497        mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
498    ) -> Result<(), E> {
499        let IndexStoreBackend::Journaled {
500            canonical,
501            live,
502            tombstones,
503        } = &self.backend
504        else {
505            return Ok(());
506        };
507
508        let lower = bounds.0.clone();
509        let upper = bounds.1.clone();
510        match direction {
511            Direction::Asc if canonical.is_empty() => {
512                for (key, value) in live.range((lower, upper)) {
513                    if visit_index_store_entry(key, value, &mut visit)? {
514                        return Ok(());
515                    }
516                }
517            }
518            Direction::Desc if canonical.is_empty() => {
519                for (key, value) in live.range((lower, upper)).rev() {
520                    if visit_index_store_entry(key, value, &mut visit)? {
521                        return Ok(());
522                    }
523                }
524            }
525            Direction::Asc if live.is_empty() && tombstones.is_empty() => {
526                for entry in canonical.range((lower, upper)) {
527                    if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
528                        return Ok(());
529                    }
530                }
531            }
532            Direction::Desc if live.is_empty() && tombstones.is_empty() => {
533                for entry in canonical.range((lower, upper)).rev() {
534                    if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
535                        return Ok(());
536                    }
537                }
538            }
539            Direction::Asc => {
540                visit_ordered_overlay(
541                    canonical.range((lower.clone(), upper.clone())),
542                    live.range((lower, upper)),
543                    direction,
544                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
545                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
546                    |live_entry| !tombstones.contains(live_entry.0),
547                    |entry| {
548                        let should_stop = match entry {
549                            OrderedOverlayEntry::Canonical(canonical_entry) => {
550                                visit_index_store_entry(
551                                    canonical_entry.key(),
552                                    &canonical_entry.value(),
553                                    &mut visit,
554                                )?
555                            }
556                            OrderedOverlayEntry::Live((key, value)) => {
557                                visit_index_store_entry(key, value, &mut visit)?
558                            }
559                        };
560                        Ok(if should_stop {
561                            OrderedOverlayVisit::Stop
562                        } else {
563                            OrderedOverlayVisit::Continue
564                        })
565                    },
566                )?;
567            }
568            Direction::Desc => {
569                visit_ordered_overlay(
570                    canonical.range((lower.clone(), upper.clone())).rev(),
571                    live.range((lower, upper)).rev(),
572                    direction,
573                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
574                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
575                    |live_entry| !tombstones.contains(live_entry.0),
576                    |entry| {
577                        let should_stop = match entry {
578                            OrderedOverlayEntry::Canonical(canonical_entry) => {
579                                visit_index_store_entry(
580                                    canonical_entry.key(),
581                                    &canonical_entry.value(),
582                                    &mut visit,
583                                )?
584                            }
585                            OrderedOverlayEntry::Live((key, value)) => {
586                                visit_index_store_entry(key, value, &mut visit)?
587                            }
588                        };
589                        Ok(if should_stop {
590                            OrderedOverlayVisit::Stop
591                        } else {
592                            OrderedOverlayVisit::Continue
593                        })
594                    },
595                )?;
596            }
597        }
598
599        Ok(())
600    }
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606    use crate::{
607        db::{
608            direction::Direction,
609            index::{IndexId, IndexKey, IndexKeyKind},
610            key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
611        },
612        testing::test_memory,
613        traits::Storable,
614        types::EntityTag,
615    };
616    use std::{borrow::Cow, convert::Infallible};
617
618    fn raw_key(value: u8) -> RawIndexStoreKey {
619        <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
620    }
621
622    fn indexed_raw_key(
623        index_id: &IndexId,
624        components: Vec<Vec<u8>>,
625        primary_key: u64,
626    ) -> RawIndexStoreKey {
627        indexed_raw_key_with_kind(index_id, IndexKeyKind::User, components, primary_key)
628    }
629
630    fn indexed_raw_key_with_kind(
631        index_id: &IndexId,
632        key_kind: IndexKeyKind,
633        components: Vec<Vec<u8>>,
634        primary_key: u64,
635    ) -> RawIndexStoreKey {
636        IndexKey::new_from_components_with_primary_key_value(
637            index_id,
638            key_kind,
639            components.as_slice(),
640            &PrimaryKeyValue::from(PrimaryKeyComponent::Nat64(primary_key)),
641        )
642        .to_raw()
643    }
644
645    fn malformed_index_entry_value() -> IndexEntryValue {
646        <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![0xFF]))
647    }
648
649    fn missing_index_entry_value() -> IndexEntryValue {
650        <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![1]))
651    }
652
653    #[test]
654    fn index_prefix_cardinality_requires_explicit_data_generation_sync() {
655        let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
656        let collection = b"collection-a".to_vec();
657        let draft = b"Draft".to_vec();
658        let review = b"Review".to_vec();
659        let mut store = IndexStore::init_heap();
660
661        store.insert(
662            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
663            IndexEntryValue::presence(),
664        );
665        store.insert(
666            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2),
667            IndexEntryValue::presence(),
668        );
669        store.insert(
670            indexed_raw_key(&index_id, vec![collection.clone(), review.clone()], 3),
671            IndexEntryValue::presence(),
672        );
673
674        assert_eq!(
675            store.exact_prefix_cardinality(
676                0,
677                IndexKeyKind::User,
678                index_id,
679                std::slice::from_ref(&collection),
680            ),
681            None,
682            "raw index mutations must not be trusted until row generation sync is stamped",
683        );
684
685        store.mark_prefix_cardinality_data_generation(7);
686
687        assert_eq!(
688            store.exact_prefix_cardinality(
689                7,
690                IndexKeyKind::User,
691                index_id,
692                std::slice::from_ref(&collection),
693            ),
694            Some(3),
695        );
696        assert_eq!(
697            store.exact_prefix_cardinality(
698                7,
699                IndexKeyKind::User,
700                index_id,
701                &[collection.clone(), draft],
702            ),
703            Some(2),
704        );
705        assert_eq!(
706            store.exact_prefix_cardinality(8, IndexKeyKind::User, index_id, &[collection, review],),
707            None,
708            "row generation drift should force the caller to use the existing-row fallback",
709        );
710    }
711
712    #[test]
713    fn index_prefix_cardinality_ignores_system_index_mutations() {
714        let user_index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
715        let system_index_id = IndexId::new(EntityTag::new(0xCA7D), 2);
716        let collection = b"collection-a".to_vec();
717        let draft = b"Draft".to_vec();
718        let system_component = b"reverse-edge".to_vec();
719        let mut store = IndexStore::init_heap();
720
721        store.insert(
722            indexed_raw_key(&user_index_id, vec![collection.clone(), draft.clone()], 1),
723            IndexEntryValue::presence(),
724        );
725        store.mark_prefix_cardinality_data_generation(7);
726
727        assert_eq!(
728            store.exact_prefix_cardinality(
729                7,
730                IndexKeyKind::User,
731                user_index_id,
732                &[collection.clone(), draft.clone()],
733            ),
734            Some(1),
735        );
736
737        let system_key = indexed_raw_key_with_kind(
738            &system_index_id,
739            IndexKeyKind::System,
740            vec![system_component],
741            1,
742        );
743        store.insert(system_key.clone(), IndexEntryValue::presence());
744        assert_eq!(
745            store.exact_prefix_cardinality(
746                7,
747                IndexKeyKind::User,
748                user_index_id,
749                &[collection.clone(), draft.clone()],
750            ),
751            Some(1),
752            "system index writes must not invalidate synchronized user-prefix cardinality",
753        );
754
755        store.remove(&system_key);
756        assert_eq!(
757            store.exact_prefix_cardinality(
758                7,
759                IndexKeyKind::User,
760                user_index_id,
761                &[collection.clone(), draft.clone()],
762            ),
763            Some(1),
764            "system index removals must not invalidate synchronized user-prefix cardinality",
765        );
766
767        let malformed_system_key = indexed_raw_key_with_kind(
768            &system_index_id,
769            IndexKeyKind::System,
770            vec![b"malformed-reverse-edge".to_vec()],
771            2,
772        );
773        store.insert(malformed_system_key.clone(), malformed_index_entry_value());
774        assert_eq!(
775            store.exact_prefix_cardinality(
776                7,
777                IndexKeyKind::User,
778                user_index_id,
779                &[collection.clone(), draft.clone()],
780            ),
781            Some(1),
782            "malformed system index payloads must not invalidate user-prefix cardinality",
783        );
784
785        store.remove(&malformed_system_key);
786        assert_eq!(
787            store.exact_prefix_cardinality(
788                7,
789                IndexKeyKind::User,
790                user_index_id,
791                &[collection.clone(), draft],
792            ),
793            Some(1),
794            "malformed system index removals must not invalidate user-prefix cardinality",
795        );
796
797        let review = b"Review".to_vec();
798        store.insert(
799            indexed_raw_key(&user_index_id, vec![collection.clone(), review.clone()], 2),
800            IndexEntryValue::presence(),
801        );
802        assert_eq!(
803            store.exact_prefix_cardinality(
804                7,
805                IndexKeyKind::User,
806                user_index_id,
807                &[collection, review]
808            ),
809            None,
810            "user-prefix count changes must still require a fresh row-generation stamp",
811        );
812    }
813
814    #[test]
815    fn index_prefix_cardinality_ignores_missing_user_index_mutations() {
816        let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
817        let collection = b"collection-a".to_vec();
818        let draft = b"Draft".to_vec();
819        let mut store = IndexStore::init_heap();
820
821        store.insert(
822            indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
823            IndexEntryValue::presence(),
824        );
825        store.mark_prefix_cardinality_data_generation(7);
826
827        let stale_key = indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2);
828        store.insert(stale_key.clone(), missing_index_entry_value());
829        assert_eq!(
830            store.exact_prefix_cardinality(
831                7,
832                IndexKeyKind::User,
833                index_id,
834                &[collection.clone(), draft.clone()],
835            ),
836            Some(1),
837            "missing user index entries must not affect synchronized prefix cardinality",
838        );
839
840        store.remove(&stale_key);
841        assert_eq!(
842            store.exact_prefix_cardinality(7, IndexKeyKind::User, index_id, &[collection, draft],),
843            Some(1),
844            "missing user index removals must not affect synchronized prefix cardinality",
845        );
846    }
847
848    #[cfg(feature = "diagnostics")]
849    #[test]
850    fn index_store_diagnostic_counters_record_gets_and_entry_reads() {
851        let mut store = IndexStore::init_heap();
852        store.insert(raw_key(7), IndexEntryValue::presence());
853        store.insert(raw_key(9), IndexEntryValue::presence());
854
855        let gets_before = IndexStore::current_get_call_count();
856        assert_eq!(store.get(&raw_key(7)), Some(IndexEntryValue::presence()));
857        assert_eq!(store.get(&raw_key(8)), None);
858
859        assert_eq!(
860            IndexStore::current_get_call_count().saturating_sub(gets_before),
861            2,
862            "diagnostic index-store get counter should count both hit and miss reads",
863        );
864
865        let entries_before = IndexStore::current_entry_read_count();
866        store
867            .visit_entries(|_key, _entry| Ok::<_, Infallible>(IndexStoreVisit::Continue))
868            .expect("index entry visit should succeed");
869
870        assert_eq!(
871            IndexStore::current_entry_read_count().saturating_sub(entries_before),
872            2,
873            "diagnostic index-store entry counter should count yielded traversal entries",
874        );
875    }
876
877    #[test]
878    fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
879        let mut store = IndexStore::init_journaled(test_memory(93));
880        for value in [1_u8, 3, 5] {
881            store.insert(raw_key(value), IndexEntryValue::presence());
882        }
883        store
884            .fold_journaled_materialized_view()
885            .expect("canonical index seed should fold");
886
887        store.insert(raw_key(0), IndexEntryValue::presence());
888        store.insert(raw_key(4), IndexEntryValue::presence());
889        store.insert(raw_key(5), IndexEntryValue::presence());
890        store.remove(&raw_key(1));
891
892        let lower = Bound::Included(raw_key(0));
893        let upper = Bound::Included(raw_key(5));
894
895        reset_journaled_snapshot_call_count_for_tests();
896        let mut asc = Vec::new();
897        store
898            .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
899                asc.push(key.as_bytes()[0]);
900                Ok::<_, Infallible>(asc.len() == 2)
901            })
902            .expect("asc journaled index range traversal should succeed");
903        assert_eq!(asc, vec![0, 3]);
904        assert_eq!(
905            journaled_snapshot_call_count_for_tests(),
906            0,
907            "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
908        );
909
910        reset_journaled_snapshot_call_count_for_tests();
911        let mut desc = Vec::new();
912        store
913            .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
914                desc.push(key.as_bytes()[0]);
915                Ok::<_, Infallible>(desc.len() == 2)
916            })
917            .expect("desc journaled index range traversal should succeed");
918        assert_eq!(desc, vec![5, 4]);
919        assert_eq!(
920            journaled_snapshot_call_count_for_tests(),
921            0,
922            "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
923        );
924    }
925}