1use crate::db::{
7 direction::Direction,
8 index::{
9 IndexEntryValue, IndexId, IndexKeyKind, cardinality::IndexPrefixCardinality,
10 key::RawIndexStoreKey,
11 },
12 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
13};
14
15use candid::CandidType;
16use ic_memory::stable_structures::{
17 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
18};
19use serde::Deserialize;
20#[cfg(any(test, feature = "diagnostics"))]
21use std::cell::Cell;
22use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
23use std::ops::Bound;
24
25#[cfg(test)]
26thread_local! {
27 static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
28}
29
30#[cfg(feature = "diagnostics")]
31thread_local! {
32 static INDEX_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
33 static INDEX_STORE_RANGE_SCAN_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
34 static INDEX_STORE_ENTRY_READ_COUNT: Cell<u64> = const { Cell::new(0) };
35 static INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT: Cell<u64> = const { Cell::new(0) };
36}
37
38#[cfg(feature = "diagnostics")]
39fn record_index_store_get_call() {
40 INDEX_STORE_GET_CALL_COUNT.with(|count| {
41 count.set(count.get().saturating_add(1));
42 });
43}
44
45#[cfg(feature = "diagnostics")]
46fn record_index_store_range_scan_call() {
47 INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(|count| {
48 count.set(count.get().saturating_add(1));
49 });
50}
51
52#[cfg(feature = "diagnostics")]
53fn record_index_store_entry_read() {
54 INDEX_STORE_ENTRY_READ_COUNT.with(|count| {
55 count.set(count.get().saturating_add(1));
56 });
57}
58
59#[cfg(feature = "diagnostics")]
60fn record_index_store_prefix_cardinality_lookup() {
61 INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT.with(|count| {
62 count.set(count.get().saturating_add(1));
63 });
64}
65
66fn visit_index_store_entry<E>(
67 key: &RawIndexStoreKey,
68 value: &IndexEntryValue,
69 visit: &mut impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
70) -> Result<bool, E> {
71 #[cfg(feature = "diagnostics")]
72 record_index_store_entry_read();
73
74 visit(key, value)
75}
76
77#[cfg(test)]
78fn record_journaled_snapshot_call() {
79 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
80 count.set(count.get().saturating_add(1));
81 });
82}
83
84#[cfg(test)]
85fn reset_journaled_snapshot_call_count_for_tests() {
86 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
87}
88
89#[cfg(test)]
90fn journaled_snapshot_call_count_for_tests() -> u64 {
91 JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
92}
93
94#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
102pub enum IndexState {
103 Building,
104 #[default]
105 Ready,
106 Dropping,
107}
108
109impl IndexState {
110 #[must_use]
112 pub const fn as_str(self) -> &'static str {
113 match self {
114 Self::Building => "building",
115 Self::Ready => "ready",
116 Self::Dropping => "dropping",
117 }
118 }
119}
120
121pub struct IndexStore {
130 pub(super) backend: IndexStoreBackend,
131 generation: u64,
132 state: IndexState,
133 prefix_cardinality: IndexPrefixCardinality,
134}
135
136pub(super) enum IndexStoreBackend {
137 Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
138 Journaled {
139 canonical:
140 StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
141 live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
142 tombstones: BTreeSet<RawIndexStoreKey>,
143 },
144}
145
146#[derive(Clone, Copy, Debug, Eq, PartialEq)]
148pub(in crate::db) enum IndexStoreVisit {
149 Continue,
150 Stop,
151}
152
153impl IndexStoreVisit {
154 const fn should_stop(self) -> bool {
155 matches!(self, Self::Stop)
156 }
157}
158
159impl IndexStore {
160 #[must_use]
162 pub const fn init_heap() -> Self {
163 Self {
164 backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
165 generation: 0,
166 state: IndexState::Ready,
167 prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
168 }
169 }
170
171 #[must_use]
176 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
177 let mut store = Self {
178 backend: IndexStoreBackend::Journaled {
179 canonical: StableBTreeMap::init(memory),
180 live: HeapBTreeMap::new(),
181 tombstones: BTreeSet::new(),
182 },
183 generation: 0,
184 state: IndexState::Ready,
185 prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
186 };
187 store.rebuild_prefix_cardinality_from_entries(Some(0));
188 store
189 }
190
191 pub(in crate::db) fn visit_entries<E>(
194 &self,
195 mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
196 ) -> Result<(), E> {
197 match &self.backend {
198 IndexStoreBackend::Heap(map) => {
199 for (key, value) in map {
200 #[cfg(feature = "diagnostics")]
201 record_index_store_entry_read();
202
203 if visitor(key, value)?.should_stop() {
204 return Ok(());
205 }
206 }
207 }
208 IndexStoreBackend::Journaled {
209 canonical: _,
210 live: _,
211 tombstones: _,
212 } => self.visit_journaled_entries_in_range(
213 (&Bound::Unbounded, &Bound::Unbounded),
214 Direction::Asc,
215 |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
216 )?,
217 }
218
219 Ok(())
220 }
221
222 pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
223 #[cfg(feature = "diagnostics")]
224 record_index_store_get_call();
225
226 match &self.backend {
227 IndexStoreBackend::Heap(map) => map.get(key).cloned(),
228 IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
229 }
230 }
231
232 pub fn len(&self) -> u64 {
233 match &self.backend {
234 IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
235 IndexStoreBackend::Journaled { .. } => {
236 let mut count = 0_u64;
237 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
238 count = count.saturating_add(1);
239 Ok(IndexStoreVisit::Continue)
240 });
241 count
242 }
243 }
244 }
245
246 pub fn is_empty(&self) -> bool {
247 match &self.backend {
248 IndexStoreBackend::Heap(map) => map.is_empty(),
249 IndexStoreBackend::Journaled { .. } => {
250 let mut empty = true;
251 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
252 empty = false;
253 Ok(IndexStoreVisit::Stop)
254 });
255 empty
256 }
257 }
258 }
259
260 #[must_use]
261 pub(in crate::db) const fn generation(&self) -> u64 {
262 self.generation
263 }
264
265 #[must_use]
267 pub(in crate::db) const fn state(&self) -> IndexState {
268 self.state
269 }
270
271 #[must_use]
274 pub(in crate::db) fn exact_prefix_cardinality(
275 &self,
276 data_generation: u64,
277 key_kind: IndexKeyKind,
278 index_id: IndexId,
279 components: &[Vec<u8>],
280 ) -> Option<u64> {
281 #[cfg(feature = "diagnostics")]
282 record_index_store_prefix_cardinality_lookup();
283
284 self.prefix_cardinality
285 .exact_count(data_generation, key_kind, index_id, components)
286 }
287
288 #[must_use]
291 pub(in crate::db) fn exact_prefix_cardinality_sum<'a>(
292 &self,
293 data_generation: u64,
294 key_kind: IndexKeyKind,
295 index_id: IndexId,
296 component_prefixes: impl IntoIterator<Item = &'a [Vec<u8>]>,
297 stop_after: Option<u64>,
298 ) -> Option<u64> {
299 #[cfg(feature = "diagnostics")]
300 record_index_store_prefix_cardinality_lookup();
301
302 self.prefix_cardinality.exact_count_sum(
303 data_generation,
304 key_kind,
305 index_id,
306 component_prefixes,
307 stop_after,
308 )
309 }
310
311 pub(in crate::db) const fn mark_prefix_cardinality_data_generation(&mut self, generation: u64) {
314 self.prefix_cardinality.mark_synchronized(generation);
315 }
316
317 pub(in crate::db) const fn mark_building(&mut self) {
320 self.state = IndexState::Building;
321 }
322
323 pub(in crate::db) const fn mark_ready(&mut self) {
325 self.state = IndexState::Ready;
326 }
327
328 pub(in crate::db) const fn mark_dropping(&mut self) {
330 self.state = IndexState::Dropping;
331 }
332
333 pub(crate) fn insert(
334 &mut self,
335 key: RawIndexStoreKey,
336 entry: IndexEntryValue,
337 ) -> Option<IndexEntryValue> {
338 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
339 self.get(&key)
340 } else {
341 None
342 };
343 let cardinality_key = key.clone();
344 let previous = match &mut self.backend {
345 IndexStoreBackend::Heap(map) => map.insert(key, entry.clone()),
346 IndexStoreBackend::Journaled {
347 live, tombstones, ..
348 } => {
349 tombstones.remove(&key);
350 live.insert(key, entry.clone());
351 previous_journaled
352 }
353 };
354 self.prefix_cardinality
355 .apply_insert(&cardinality_key, previous.as_ref(), &entry);
356 self.bump_generation();
357 previous
358 }
359
360 pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
361 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
362 self.get(key)
363 } else {
364 None
365 };
366 let previous = match &mut self.backend {
367 IndexStoreBackend::Heap(map) => map.remove(key),
368 IndexStoreBackend::Journaled {
369 live, tombstones, ..
370 } => {
371 live.remove(key);
372 tombstones.insert(key.clone());
373 previous_journaled
374 }
375 };
376 self.prefix_cardinality.apply_remove(key, previous.as_ref());
377 self.bump_generation();
378 previous
379 }
380
381 pub fn clear(&mut self) {
382 match &mut self.backend {
383 IndexStoreBackend::Heap(map) => map.clear(),
384 IndexStoreBackend::Journaled {
385 canonical,
386 live,
387 tombstones,
388 } => {
389 live.clear();
390 tombstones.clear();
391 for entry in canonical.iter() {
392 tombstones.insert(entry.key().clone());
393 }
394 }
395 }
396 self.prefix_cardinality.clear_unsynchronized();
397 self.bump_generation();
398 }
399
400 pub(in crate::db) fn fold_journaled_materialized_view(
403 &mut self,
404 ) -> Result<(), crate::error::InternalError> {
405 let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
406 let IndexStoreBackend::Journaled {
407 canonical,
408 live,
409 tombstones,
410 } = &mut self.backend
411 else {
412 return Err(crate::error::InternalError::store_invariant());
413 };
414
415 canonical.clear_new();
416 for (key, value) in entries {
417 canonical.insert(key, value);
418 }
419 live.clear();
420 tombstones.clear();
421 let data_generation = self.prefix_cardinality.synchronized_generation();
422 self.rebuild_prefix_cardinality_from_entries(data_generation);
423 self.bump_generation();
424
425 Ok(())
426 }
427
428 pub fn memory_bytes(&self) -> u64 {
430 let mut bytes = 0u64;
431 let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
432 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
433 Ok(IndexStoreVisit::Continue)
434 });
435 bytes
436 }
437
438 #[cfg(feature = "diagnostics")]
440 pub(in crate::db) fn current_get_call_count() -> u64 {
441 INDEX_STORE_GET_CALL_COUNT.with(Cell::get)
442 }
443
444 #[cfg(feature = "diagnostics")]
446 pub(in crate::db) fn current_range_scan_call_count() -> u64 {
447 INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(Cell::get)
448 }
449
450 #[cfg(feature = "diagnostics")]
452 pub(in crate::db) fn current_entry_read_count() -> u64 {
453 INDEX_STORE_ENTRY_READ_COUNT.with(Cell::get)
454 }
455
456 #[cfg(all(test, feature = "diagnostics"))]
458 pub(in crate::db) fn current_prefix_cardinality_lookup_count() -> u64 {
459 INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT.with(Cell::get)
460 }
461
462 #[cfg(feature = "diagnostics")]
463 pub(in crate::db::index) fn record_range_scan_call() {
464 record_index_store_range_scan_call();
465 }
466
467 const fn bump_generation(&mut self) {
468 self.generation = self.generation.saturating_add(1);
469 }
470
471 fn rebuild_prefix_cardinality_from_entries(&mut self, data_generation: Option<u64>) {
472 self.prefix_cardinality.clear_unsynchronized();
473 let entries = Self::entries_snapshot_for_cardinality(&self.backend);
474 for (key, value) in &entries {
475 self.prefix_cardinality.apply_insert(key, None, value);
476 }
477 if let Some(data_generation) = data_generation {
478 self.prefix_cardinality.mark_synchronized(data_generation);
479 }
480 }
481
482 fn entries_snapshot_for_cardinality(
483 backend: &IndexStoreBackend,
484 ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
485 match backend {
486 IndexStoreBackend::Heap(map) => map.clone(),
487 IndexStoreBackend::Journaled { .. } => {
488 Self::journaled_entries_snapshot_for_fold(backend)
489 }
490 }
491 }
492
493 #[cfg(test)]
494 #[must_use]
495 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
496 match &self.backend {
497 IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
498 IndexStoreBackend::Heap(_) => 0,
499 }
500 }
501
502 fn journaled_get(
503 backend: &IndexStoreBackend,
504 key: &RawIndexStoreKey,
505 ) -> Option<IndexEntryValue> {
506 let IndexStoreBackend::Journaled {
507 canonical,
508 live,
509 tombstones,
510 } = backend
511 else {
512 return None;
513 };
514
515 if tombstones.contains(key) {
516 return None;
517 }
518 live.get(key).cloned().or_else(|| canonical.get(key))
519 }
520
521 pub(super) fn journaled_entries_snapshot_for_fold(
522 backend: &IndexStoreBackend,
523 ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
524 #[cfg(test)]
525 record_journaled_snapshot_call();
526
527 let IndexStoreBackend::Journaled {
528 canonical,
529 live,
530 tombstones,
531 } = backend
532 else {
533 return HeapBTreeMap::new();
534 };
535
536 let mut entries = HeapBTreeMap::new();
537 for entry in canonical.iter() {
538 let key = entry.key().clone();
539 if !tombstones.contains(&key) {
540 entries.insert(key, entry.value());
541 }
542 }
543 for (key, value) in live {
544 if !tombstones.contains(key) {
545 entries.insert(key.clone(), value.clone());
546 }
547 }
548
549 entries
550 }
551
552 pub(super) fn visit_journaled_entries_in_range<E>(
553 &self,
554 bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
555 direction: Direction,
556 mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
557 ) -> Result<(), E> {
558 let IndexStoreBackend::Journaled {
559 canonical,
560 live,
561 tombstones,
562 } = &self.backend
563 else {
564 return Ok(());
565 };
566
567 let lower = bounds.0.clone();
568 let upper = bounds.1.clone();
569 match direction {
570 Direction::Asc if canonical.is_empty() => {
571 for (key, value) in live.range((lower, upper)) {
572 if visit_index_store_entry(key, value, &mut visit)? {
573 return Ok(());
574 }
575 }
576 }
577 Direction::Desc if canonical.is_empty() => {
578 for (key, value) in live.range((lower, upper)).rev() {
579 if visit_index_store_entry(key, value, &mut visit)? {
580 return Ok(());
581 }
582 }
583 }
584 Direction::Asc if live.is_empty() && tombstones.is_empty() => {
585 for entry in canonical.range((lower, upper)) {
586 if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
587 return Ok(());
588 }
589 }
590 }
591 Direction::Desc if live.is_empty() && tombstones.is_empty() => {
592 for entry in canonical.range((lower, upper)).rev() {
593 if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
594 return Ok(());
595 }
596 }
597 }
598 Direction::Asc => {
599 visit_ordered_overlay(
600 canonical.range((lower.clone(), upper.clone())),
601 live.range((lower, upper)),
602 direction,
603 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
604 |canonical_entry| !tombstones.contains(canonical_entry.key()),
605 |live_entry| !tombstones.contains(live_entry.0),
606 |entry| {
607 let should_stop = match entry {
608 OrderedOverlayEntry::Canonical(canonical_entry) => {
609 visit_index_store_entry(
610 canonical_entry.key(),
611 &canonical_entry.value(),
612 &mut visit,
613 )?
614 }
615 OrderedOverlayEntry::Live((key, value)) => {
616 visit_index_store_entry(key, value, &mut visit)?
617 }
618 };
619 Ok(if should_stop {
620 OrderedOverlayVisit::Stop
621 } else {
622 OrderedOverlayVisit::Continue
623 })
624 },
625 )?;
626 }
627 Direction::Desc => {
628 visit_ordered_overlay(
629 canonical.range((lower.clone(), upper.clone())).rev(),
630 live.range((lower, upper)).rev(),
631 direction,
632 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
633 |canonical_entry| !tombstones.contains(canonical_entry.key()),
634 |live_entry| !tombstones.contains(live_entry.0),
635 |entry| {
636 let should_stop = match entry {
637 OrderedOverlayEntry::Canonical(canonical_entry) => {
638 visit_index_store_entry(
639 canonical_entry.key(),
640 &canonical_entry.value(),
641 &mut visit,
642 )?
643 }
644 OrderedOverlayEntry::Live((key, value)) => {
645 visit_index_store_entry(key, value, &mut visit)?
646 }
647 };
648 Ok(if should_stop {
649 OrderedOverlayVisit::Stop
650 } else {
651 OrderedOverlayVisit::Continue
652 })
653 },
654 )?;
655 }
656 }
657
658 Ok(())
659 }
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665 use crate::{
666 db::{
667 direction::Direction,
668 index::{IndexId, IndexKey, IndexKeyKind},
669 key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
670 },
671 testing::test_memory,
672 traits::Storable,
673 types::EntityTag,
674 };
675 use std::{borrow::Cow, convert::Infallible};
676
677 fn raw_key(value: u8) -> RawIndexStoreKey {
678 <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
679 }
680
681 fn indexed_raw_key(
682 index_id: &IndexId,
683 components: Vec<Vec<u8>>,
684 primary_key: u64,
685 ) -> RawIndexStoreKey {
686 indexed_raw_key_with_kind(index_id, IndexKeyKind::User, components, primary_key)
687 }
688
689 fn indexed_raw_key_with_kind(
690 index_id: &IndexId,
691 key_kind: IndexKeyKind,
692 components: Vec<Vec<u8>>,
693 primary_key: u64,
694 ) -> RawIndexStoreKey {
695 IndexKey::new_from_components_with_primary_key_value(
696 index_id,
697 key_kind,
698 components.as_slice(),
699 &PrimaryKeyValue::from(PrimaryKeyComponent::Nat64(primary_key)),
700 )
701 .to_raw()
702 }
703
704 fn malformed_index_entry_value() -> IndexEntryValue {
705 <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![0xFF]))
706 }
707
708 fn missing_index_entry_value() -> IndexEntryValue {
709 <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![1]))
710 }
711
712 #[test]
713 fn index_prefix_cardinality_requires_explicit_data_generation_sync() {
714 let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
715 let collection = b"collection-a".to_vec();
716 let draft = b"Draft".to_vec();
717 let review = b"Review".to_vec();
718 let mut store = IndexStore::init_heap();
719
720 store.insert(
721 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
722 IndexEntryValue::presence(),
723 );
724 store.insert(
725 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2),
726 IndexEntryValue::presence(),
727 );
728 store.insert(
729 indexed_raw_key(&index_id, vec![collection.clone(), review.clone()], 3),
730 IndexEntryValue::presence(),
731 );
732
733 assert_eq!(
734 store.exact_prefix_cardinality(
735 0,
736 IndexKeyKind::User,
737 index_id,
738 std::slice::from_ref(&collection),
739 ),
740 None,
741 "raw index mutations must not be trusted until row generation sync is stamped",
742 );
743
744 store.mark_prefix_cardinality_data_generation(7);
745
746 assert_eq!(
747 store.exact_prefix_cardinality(
748 7,
749 IndexKeyKind::User,
750 index_id,
751 std::slice::from_ref(&collection),
752 ),
753 Some(3),
754 );
755 assert_eq!(
756 store.exact_prefix_cardinality(
757 7,
758 IndexKeyKind::User,
759 index_id,
760 &[collection.clone(), draft],
761 ),
762 Some(2),
763 );
764 assert_eq!(
765 store.exact_prefix_cardinality(8, IndexKeyKind::User, index_id, &[collection, review],),
766 None,
767 "row generation drift should force the caller to use the existing-row fallback",
768 );
769 }
770
771 #[test]
772 fn index_prefix_cardinality_ignores_system_index_mutations() {
773 let user_index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
774 let system_index_id = IndexId::new(EntityTag::new(0xCA7D), 2);
775 let collection = b"collection-a".to_vec();
776 let draft = b"Draft".to_vec();
777 let system_component = b"reverse-edge".to_vec();
778 let mut store = IndexStore::init_heap();
779
780 store.insert(
781 indexed_raw_key(&user_index_id, vec![collection.clone(), draft.clone()], 1),
782 IndexEntryValue::presence(),
783 );
784 store.mark_prefix_cardinality_data_generation(7);
785
786 assert_eq!(
787 store.exact_prefix_cardinality(
788 7,
789 IndexKeyKind::User,
790 user_index_id,
791 &[collection.clone(), draft.clone()],
792 ),
793 Some(1),
794 );
795
796 let system_key = indexed_raw_key_with_kind(
797 &system_index_id,
798 IndexKeyKind::System,
799 vec![system_component],
800 1,
801 );
802 store.insert(system_key.clone(), IndexEntryValue::presence());
803 assert_eq!(
804 store.exact_prefix_cardinality(
805 7,
806 IndexKeyKind::User,
807 user_index_id,
808 &[collection.clone(), draft.clone()],
809 ),
810 Some(1),
811 "system index writes must not invalidate synchronized user-prefix cardinality",
812 );
813
814 store.remove(&system_key);
815 assert_eq!(
816 store.exact_prefix_cardinality(
817 7,
818 IndexKeyKind::User,
819 user_index_id,
820 &[collection.clone(), draft.clone()],
821 ),
822 Some(1),
823 "system index removals must not invalidate synchronized user-prefix cardinality",
824 );
825
826 let malformed_system_key = indexed_raw_key_with_kind(
827 &system_index_id,
828 IndexKeyKind::System,
829 vec![b"malformed-reverse-edge".to_vec()],
830 2,
831 );
832 store.insert(malformed_system_key.clone(), malformed_index_entry_value());
833 assert_eq!(
834 store.exact_prefix_cardinality(
835 7,
836 IndexKeyKind::User,
837 user_index_id,
838 &[collection.clone(), draft.clone()],
839 ),
840 Some(1),
841 "malformed system index payloads must not invalidate user-prefix cardinality",
842 );
843
844 store.remove(&malformed_system_key);
845 assert_eq!(
846 store.exact_prefix_cardinality(
847 7,
848 IndexKeyKind::User,
849 user_index_id,
850 &[collection.clone(), draft],
851 ),
852 Some(1),
853 "malformed system index removals must not invalidate user-prefix cardinality",
854 );
855
856 let review = b"Review".to_vec();
857 store.insert(
858 indexed_raw_key(&user_index_id, vec![collection.clone(), review.clone()], 2),
859 IndexEntryValue::presence(),
860 );
861 assert_eq!(
862 store.exact_prefix_cardinality(
863 7,
864 IndexKeyKind::User,
865 user_index_id,
866 &[collection, review]
867 ),
868 None,
869 "user-prefix count changes must still require a fresh row-generation stamp",
870 );
871 }
872
873 #[test]
874 fn index_prefix_cardinality_ignores_missing_user_index_mutations() {
875 let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
876 let collection = b"collection-a".to_vec();
877 let draft = b"Draft".to_vec();
878 let mut store = IndexStore::init_heap();
879
880 store.insert(
881 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
882 IndexEntryValue::presence(),
883 );
884 store.mark_prefix_cardinality_data_generation(7);
885
886 let stale_key = indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2);
887 store.insert(stale_key.clone(), missing_index_entry_value());
888 assert_eq!(
889 store.exact_prefix_cardinality(
890 7,
891 IndexKeyKind::User,
892 index_id,
893 &[collection.clone(), draft.clone()],
894 ),
895 Some(1),
896 "missing user index entries must not affect synchronized prefix cardinality",
897 );
898
899 store.remove(&stale_key);
900 assert_eq!(
901 store.exact_prefix_cardinality(7, IndexKeyKind::User, index_id, &[collection, draft],),
902 Some(1),
903 "missing user index removals must not affect synchronized prefix cardinality",
904 );
905 }
906
907 #[cfg(feature = "diagnostics")]
908 #[test]
909 fn index_store_diagnostic_counters_record_gets_range_scans_and_entry_reads() {
910 let mut store = IndexStore::init_heap();
911 store.insert(raw_key(7), IndexEntryValue::presence());
912 store.insert(raw_key(9), IndexEntryValue::presence());
913
914 let gets_before = IndexStore::current_get_call_count();
915 assert_eq!(store.get(&raw_key(7)), Some(IndexEntryValue::presence()));
916 assert_eq!(store.get(&raw_key(8)), None);
917
918 assert_eq!(
919 IndexStore::current_get_call_count().saturating_sub(gets_before),
920 2,
921 "diagnostic index-store get counter should count both hit and miss reads",
922 );
923
924 let range_scans_before = IndexStore::current_range_scan_call_count();
925 let lower = Bound::Included(raw_key(7));
926 let upper = Bound::Included(raw_key(9));
927 store
928 .visit_raw_entries_in_range((&lower, &upper), Direction::Asc, |_key, _entry| Ok(false))
929 .expect("raw index range visit should succeed");
930
931 assert_eq!(
932 IndexStore::current_range_scan_call_count().saturating_sub(range_scans_before),
933 1,
934 "diagnostic index-store range-scan counter should count one range traversal probe",
935 );
936
937 let entries_before = IndexStore::current_entry_read_count();
938 store
939 .visit_entries(|_key, _entry| Ok::<_, Infallible>(IndexStoreVisit::Continue))
940 .expect("index entry visit should succeed");
941
942 assert_eq!(
943 IndexStore::current_entry_read_count().saturating_sub(entries_before),
944 2,
945 "diagnostic index-store entry counter should count yielded traversal entries",
946 );
947 }
948
949 #[test]
950 fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
951 let mut store = IndexStore::init_journaled(test_memory(93));
952 for value in [1_u8, 3, 5] {
953 store.insert(raw_key(value), IndexEntryValue::presence());
954 }
955 store
956 .fold_journaled_materialized_view()
957 .expect("canonical index seed should fold");
958
959 store.insert(raw_key(0), IndexEntryValue::presence());
960 store.insert(raw_key(4), IndexEntryValue::presence());
961 store.insert(raw_key(5), IndexEntryValue::presence());
962 store.remove(&raw_key(1));
963
964 let lower = Bound::Included(raw_key(0));
965 let upper = Bound::Included(raw_key(5));
966
967 reset_journaled_snapshot_call_count_for_tests();
968 let mut asc = Vec::new();
969 store
970 .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
971 asc.push(key.as_bytes()[0]);
972 Ok::<_, Infallible>(asc.len() == 2)
973 })
974 .expect("asc journaled index range traversal should succeed");
975 assert_eq!(asc, vec![0, 3]);
976 assert_eq!(
977 journaled_snapshot_call_count_for_tests(),
978 0,
979 "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
980 );
981
982 reset_journaled_snapshot_call_count_for_tests();
983 let mut desc = Vec::new();
984 store
985 .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
986 desc.push(key.as_bytes()[0]);
987 Ok::<_, Infallible>(desc.len() == 2)
988 })
989 .expect("desc journaled index range traversal should succeed");
990 assert_eq!(desc, vec![5, 4]);
991 assert_eq!(
992 journaled_snapshot_call_count_for_tests(),
993 0,
994 "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
995 );
996 }
997}