1use crate::db::{
7 direction::Direction,
8 index::{
9 IndexEntryValue, IndexId, IndexKeyKind, cardinality::IndexPrefixCardinality,
10 key::RawIndexStoreKey,
11 },
12 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
13};
14
15use candid::CandidType;
16use ic_memory::stable_structures::{
17 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
18};
19use serde::Deserialize;
20#[cfg(any(test, feature = "diagnostics"))]
21use std::cell::Cell;
22use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
23use std::ops::Bound;
24
25#[cfg(test)]
26thread_local! {
27 static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
28}
29
30#[cfg(feature = "diagnostics")]
31thread_local! {
32 static INDEX_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
33 static INDEX_STORE_RANGE_SCAN_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
34 static INDEX_STORE_ENTRY_READ_COUNT: Cell<u64> = const { Cell::new(0) };
35}
36
37#[cfg(feature = "diagnostics")]
38fn record_index_store_get_call() {
39 INDEX_STORE_GET_CALL_COUNT.with(|count| {
40 count.set(count.get().saturating_add(1));
41 });
42}
43
44#[cfg(feature = "diagnostics")]
45fn record_index_store_range_scan_call() {
46 INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(|count| {
47 count.set(count.get().saturating_add(1));
48 });
49}
50
51#[cfg(feature = "diagnostics")]
52fn record_index_store_entry_read() {
53 INDEX_STORE_ENTRY_READ_COUNT.with(|count| {
54 count.set(count.get().saturating_add(1));
55 });
56}
57
58fn visit_index_store_entry<E>(
59 key: &RawIndexStoreKey,
60 value: &IndexEntryValue,
61 visit: &mut impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
62) -> Result<bool, E> {
63 #[cfg(feature = "diagnostics")]
64 record_index_store_entry_read();
65
66 visit(key, value)
67}
68
69#[cfg(test)]
70fn record_journaled_snapshot_call() {
71 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
72 count.set(count.get().saturating_add(1));
73 });
74}
75
76#[cfg(test)]
77fn reset_journaled_snapshot_call_count_for_tests() {
78 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
79}
80
81#[cfg(test)]
82fn journaled_snapshot_call_count_for_tests() -> u64 {
83 JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
84}
85
86#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
94pub enum IndexState {
95 Building,
96 #[default]
97 Ready,
98 Dropping,
99}
100
101impl IndexState {
102 #[must_use]
104 pub const fn as_str(self) -> &'static str {
105 match self {
106 Self::Building => "building",
107 Self::Ready => "ready",
108 Self::Dropping => "dropping",
109 }
110 }
111}
112
113pub struct IndexStore {
122 pub(super) backend: IndexStoreBackend,
123 generation: u64,
124 state: IndexState,
125 prefix_cardinality: IndexPrefixCardinality,
126}
127
128pub(super) enum IndexStoreBackend {
129 Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
130 Journaled {
131 canonical:
132 StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
133 live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
134 tombstones: BTreeSet<RawIndexStoreKey>,
135 },
136}
137
138#[derive(Clone, Copy, Debug, Eq, PartialEq)]
140pub(in crate::db) enum IndexStoreVisit {
141 Continue,
142 Stop,
143}
144
145impl IndexStoreVisit {
146 const fn should_stop(self) -> bool {
147 matches!(self, Self::Stop)
148 }
149}
150
151impl IndexStore {
152 #[must_use]
154 pub const fn init_heap() -> Self {
155 Self {
156 backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
157 generation: 0,
158 state: IndexState::Ready,
159 prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
160 }
161 }
162
163 #[must_use]
168 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
169 let mut store = Self {
170 backend: IndexStoreBackend::Journaled {
171 canonical: StableBTreeMap::init(memory),
172 live: HeapBTreeMap::new(),
173 tombstones: BTreeSet::new(),
174 },
175 generation: 0,
176 state: IndexState::Ready,
177 prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
178 };
179 store.rebuild_prefix_cardinality_from_entries(Some(0));
180 store
181 }
182
183 pub(in crate::db) fn visit_entries<E>(
186 &self,
187 mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
188 ) -> Result<(), E> {
189 match &self.backend {
190 IndexStoreBackend::Heap(map) => {
191 for (key, value) in map {
192 #[cfg(feature = "diagnostics")]
193 record_index_store_entry_read();
194
195 if visitor(key, value)?.should_stop() {
196 return Ok(());
197 }
198 }
199 }
200 IndexStoreBackend::Journaled {
201 canonical: _,
202 live: _,
203 tombstones: _,
204 } => self.visit_journaled_entries_in_range(
205 (&Bound::Unbounded, &Bound::Unbounded),
206 Direction::Asc,
207 |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
208 )?,
209 }
210
211 Ok(())
212 }
213
214 pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
215 #[cfg(feature = "diagnostics")]
216 record_index_store_get_call();
217
218 match &self.backend {
219 IndexStoreBackend::Heap(map) => map.get(key).cloned(),
220 IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
221 }
222 }
223
224 pub fn len(&self) -> u64 {
225 match &self.backend {
226 IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
227 IndexStoreBackend::Journaled { .. } => {
228 let mut count = 0_u64;
229 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
230 count = count.saturating_add(1);
231 Ok(IndexStoreVisit::Continue)
232 });
233 count
234 }
235 }
236 }
237
238 pub fn is_empty(&self) -> bool {
239 match &self.backend {
240 IndexStoreBackend::Heap(map) => map.is_empty(),
241 IndexStoreBackend::Journaled { .. } => {
242 let mut empty = true;
243 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
244 empty = false;
245 Ok(IndexStoreVisit::Stop)
246 });
247 empty
248 }
249 }
250 }
251
252 #[must_use]
253 pub(in crate::db) const fn generation(&self) -> u64 {
254 self.generation
255 }
256
257 #[must_use]
259 pub(in crate::db) const fn state(&self) -> IndexState {
260 self.state
261 }
262
263 #[must_use]
266 pub(in crate::db) fn exact_prefix_cardinality(
267 &self,
268 data_generation: u64,
269 key_kind: IndexKeyKind,
270 index_id: IndexId,
271 components: &[Vec<u8>],
272 ) -> Option<u64> {
273 self.prefix_cardinality
274 .exact_count(data_generation, key_kind, index_id, components)
275 }
276
277 pub(in crate::db) const fn mark_prefix_cardinality_data_generation(&mut self, generation: u64) {
280 self.prefix_cardinality.mark_synchronized(generation);
281 }
282
283 pub(in crate::db) const fn mark_building(&mut self) {
286 self.state = IndexState::Building;
287 }
288
289 pub(in crate::db) const fn mark_ready(&mut self) {
291 self.state = IndexState::Ready;
292 }
293
294 pub(in crate::db) const fn mark_dropping(&mut self) {
296 self.state = IndexState::Dropping;
297 }
298
299 pub(crate) fn insert(
300 &mut self,
301 key: RawIndexStoreKey,
302 entry: IndexEntryValue,
303 ) -> Option<IndexEntryValue> {
304 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
305 self.get(&key)
306 } else {
307 None
308 };
309 let cardinality_key = key.clone();
310 let previous = match &mut self.backend {
311 IndexStoreBackend::Heap(map) => map.insert(key, entry.clone()),
312 IndexStoreBackend::Journaled {
313 live, tombstones, ..
314 } => {
315 tombstones.remove(&key);
316 live.insert(key, entry.clone());
317 previous_journaled
318 }
319 };
320 self.prefix_cardinality
321 .apply_insert(&cardinality_key, previous.as_ref(), &entry);
322 self.bump_generation();
323 previous
324 }
325
326 pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
327 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
328 self.get(key)
329 } else {
330 None
331 };
332 let previous = match &mut self.backend {
333 IndexStoreBackend::Heap(map) => map.remove(key),
334 IndexStoreBackend::Journaled {
335 live, tombstones, ..
336 } => {
337 live.remove(key);
338 tombstones.insert(key.clone());
339 previous_journaled
340 }
341 };
342 self.prefix_cardinality.apply_remove(key, previous.as_ref());
343 self.bump_generation();
344 previous
345 }
346
347 pub fn clear(&mut self) {
348 match &mut self.backend {
349 IndexStoreBackend::Heap(map) => map.clear(),
350 IndexStoreBackend::Journaled {
351 canonical,
352 live,
353 tombstones,
354 } => {
355 live.clear();
356 tombstones.clear();
357 for entry in canonical.iter() {
358 tombstones.insert(entry.key().clone());
359 }
360 }
361 }
362 self.prefix_cardinality.clear_unsynchronized();
363 self.bump_generation();
364 }
365
366 pub(in crate::db) fn fold_journaled_materialized_view(
369 &mut self,
370 ) -> Result<(), crate::error::InternalError> {
371 let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
372 let IndexStoreBackend::Journaled {
373 canonical,
374 live,
375 tombstones,
376 } = &mut self.backend
377 else {
378 return Err(crate::error::InternalError::store_invariant());
379 };
380
381 canonical.clear_new();
382 for (key, value) in entries {
383 canonical.insert(key, value);
384 }
385 live.clear();
386 tombstones.clear();
387 let data_generation = self.prefix_cardinality.synchronized_generation();
388 self.rebuild_prefix_cardinality_from_entries(data_generation);
389 self.bump_generation();
390
391 Ok(())
392 }
393
394 pub fn memory_bytes(&self) -> u64 {
396 let mut bytes = 0u64;
397 let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
398 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
399 Ok(IndexStoreVisit::Continue)
400 });
401 bytes
402 }
403
404 #[cfg(feature = "diagnostics")]
406 pub(in crate::db) fn current_get_call_count() -> u64 {
407 INDEX_STORE_GET_CALL_COUNT.with(Cell::get)
408 }
409
410 #[cfg(feature = "diagnostics")]
412 pub(in crate::db) fn current_range_scan_call_count() -> u64 {
413 INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(Cell::get)
414 }
415
416 #[cfg(feature = "diagnostics")]
418 pub(in crate::db) fn current_entry_read_count() -> u64 {
419 INDEX_STORE_ENTRY_READ_COUNT.with(Cell::get)
420 }
421
422 #[cfg(feature = "diagnostics")]
423 pub(in crate::db::index) fn record_range_scan_call() {
424 record_index_store_range_scan_call();
425 }
426
427 const fn bump_generation(&mut self) {
428 self.generation = self.generation.saturating_add(1);
429 }
430
431 fn rebuild_prefix_cardinality_from_entries(&mut self, data_generation: Option<u64>) {
432 self.prefix_cardinality.clear_unsynchronized();
433 let entries = Self::entries_snapshot_for_cardinality(&self.backend);
434 for (key, value) in &entries {
435 self.prefix_cardinality.apply_insert(key, None, value);
436 }
437 if let Some(data_generation) = data_generation {
438 self.prefix_cardinality.mark_synchronized(data_generation);
439 }
440 }
441
442 fn entries_snapshot_for_cardinality(
443 backend: &IndexStoreBackend,
444 ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
445 match backend {
446 IndexStoreBackend::Heap(map) => map.clone(),
447 IndexStoreBackend::Journaled { .. } => {
448 Self::journaled_entries_snapshot_for_fold(backend)
449 }
450 }
451 }
452
453 #[cfg(test)]
454 #[must_use]
455 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
456 match &self.backend {
457 IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
458 IndexStoreBackend::Heap(_) => 0,
459 }
460 }
461
462 fn journaled_get(
463 backend: &IndexStoreBackend,
464 key: &RawIndexStoreKey,
465 ) -> Option<IndexEntryValue> {
466 let IndexStoreBackend::Journaled {
467 canonical,
468 live,
469 tombstones,
470 } = backend
471 else {
472 return None;
473 };
474
475 if tombstones.contains(key) {
476 return None;
477 }
478 live.get(key).cloned().or_else(|| canonical.get(key))
479 }
480
481 pub(super) fn journaled_entries_snapshot_for_fold(
482 backend: &IndexStoreBackend,
483 ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
484 #[cfg(test)]
485 record_journaled_snapshot_call();
486
487 let IndexStoreBackend::Journaled {
488 canonical,
489 live,
490 tombstones,
491 } = backend
492 else {
493 return HeapBTreeMap::new();
494 };
495
496 let mut entries = HeapBTreeMap::new();
497 for entry in canonical.iter() {
498 let key = entry.key().clone();
499 if !tombstones.contains(&key) {
500 entries.insert(key, entry.value());
501 }
502 }
503 for (key, value) in live {
504 if !tombstones.contains(key) {
505 entries.insert(key.clone(), value.clone());
506 }
507 }
508
509 entries
510 }
511
512 pub(super) fn visit_journaled_entries_in_range<E>(
513 &self,
514 bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
515 direction: Direction,
516 mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
517 ) -> Result<(), E> {
518 let IndexStoreBackend::Journaled {
519 canonical,
520 live,
521 tombstones,
522 } = &self.backend
523 else {
524 return Ok(());
525 };
526
527 let lower = bounds.0.clone();
528 let upper = bounds.1.clone();
529 match direction {
530 Direction::Asc if canonical.is_empty() => {
531 for (key, value) in live.range((lower, upper)) {
532 if visit_index_store_entry(key, value, &mut visit)? {
533 return Ok(());
534 }
535 }
536 }
537 Direction::Desc if canonical.is_empty() => {
538 for (key, value) in live.range((lower, upper)).rev() {
539 if visit_index_store_entry(key, value, &mut visit)? {
540 return Ok(());
541 }
542 }
543 }
544 Direction::Asc if live.is_empty() && tombstones.is_empty() => {
545 for entry in canonical.range((lower, upper)) {
546 if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
547 return Ok(());
548 }
549 }
550 }
551 Direction::Desc if live.is_empty() && tombstones.is_empty() => {
552 for entry in canonical.range((lower, upper)).rev() {
553 if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
554 return Ok(());
555 }
556 }
557 }
558 Direction::Asc => {
559 visit_ordered_overlay(
560 canonical.range((lower.clone(), upper.clone())),
561 live.range((lower, upper)),
562 direction,
563 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
564 |canonical_entry| !tombstones.contains(canonical_entry.key()),
565 |live_entry| !tombstones.contains(live_entry.0),
566 |entry| {
567 let should_stop = match entry {
568 OrderedOverlayEntry::Canonical(canonical_entry) => {
569 visit_index_store_entry(
570 canonical_entry.key(),
571 &canonical_entry.value(),
572 &mut visit,
573 )?
574 }
575 OrderedOverlayEntry::Live((key, value)) => {
576 visit_index_store_entry(key, value, &mut visit)?
577 }
578 };
579 Ok(if should_stop {
580 OrderedOverlayVisit::Stop
581 } else {
582 OrderedOverlayVisit::Continue
583 })
584 },
585 )?;
586 }
587 Direction::Desc => {
588 visit_ordered_overlay(
589 canonical.range((lower.clone(), upper.clone())).rev(),
590 live.range((lower, upper)).rev(),
591 direction,
592 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
593 |canonical_entry| !tombstones.contains(canonical_entry.key()),
594 |live_entry| !tombstones.contains(live_entry.0),
595 |entry| {
596 let should_stop = match entry {
597 OrderedOverlayEntry::Canonical(canonical_entry) => {
598 visit_index_store_entry(
599 canonical_entry.key(),
600 &canonical_entry.value(),
601 &mut visit,
602 )?
603 }
604 OrderedOverlayEntry::Live((key, value)) => {
605 visit_index_store_entry(key, value, &mut visit)?
606 }
607 };
608 Ok(if should_stop {
609 OrderedOverlayVisit::Stop
610 } else {
611 OrderedOverlayVisit::Continue
612 })
613 },
614 )?;
615 }
616 }
617
618 Ok(())
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625 use crate::{
626 db::{
627 direction::Direction,
628 index::{IndexId, IndexKey, IndexKeyKind},
629 key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
630 },
631 testing::test_memory,
632 traits::Storable,
633 types::EntityTag,
634 };
635 use std::{borrow::Cow, convert::Infallible};
636
637 fn raw_key(value: u8) -> RawIndexStoreKey {
638 <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
639 }
640
641 fn indexed_raw_key(
642 index_id: &IndexId,
643 components: Vec<Vec<u8>>,
644 primary_key: u64,
645 ) -> RawIndexStoreKey {
646 indexed_raw_key_with_kind(index_id, IndexKeyKind::User, components, primary_key)
647 }
648
649 fn indexed_raw_key_with_kind(
650 index_id: &IndexId,
651 key_kind: IndexKeyKind,
652 components: Vec<Vec<u8>>,
653 primary_key: u64,
654 ) -> RawIndexStoreKey {
655 IndexKey::new_from_components_with_primary_key_value(
656 index_id,
657 key_kind,
658 components.as_slice(),
659 &PrimaryKeyValue::from(PrimaryKeyComponent::Nat64(primary_key)),
660 )
661 .to_raw()
662 }
663
664 fn malformed_index_entry_value() -> IndexEntryValue {
665 <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![0xFF]))
666 }
667
668 fn missing_index_entry_value() -> IndexEntryValue {
669 <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![1]))
670 }
671
672 #[test]
673 fn index_prefix_cardinality_requires_explicit_data_generation_sync() {
674 let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
675 let collection = b"collection-a".to_vec();
676 let draft = b"Draft".to_vec();
677 let review = b"Review".to_vec();
678 let mut store = IndexStore::init_heap();
679
680 store.insert(
681 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
682 IndexEntryValue::presence(),
683 );
684 store.insert(
685 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2),
686 IndexEntryValue::presence(),
687 );
688 store.insert(
689 indexed_raw_key(&index_id, vec![collection.clone(), review.clone()], 3),
690 IndexEntryValue::presence(),
691 );
692
693 assert_eq!(
694 store.exact_prefix_cardinality(
695 0,
696 IndexKeyKind::User,
697 index_id,
698 std::slice::from_ref(&collection),
699 ),
700 None,
701 "raw index mutations must not be trusted until row generation sync is stamped",
702 );
703
704 store.mark_prefix_cardinality_data_generation(7);
705
706 assert_eq!(
707 store.exact_prefix_cardinality(
708 7,
709 IndexKeyKind::User,
710 index_id,
711 std::slice::from_ref(&collection),
712 ),
713 Some(3),
714 );
715 assert_eq!(
716 store.exact_prefix_cardinality(
717 7,
718 IndexKeyKind::User,
719 index_id,
720 &[collection.clone(), draft],
721 ),
722 Some(2),
723 );
724 assert_eq!(
725 store.exact_prefix_cardinality(8, IndexKeyKind::User, index_id, &[collection, review],),
726 None,
727 "row generation drift should force the caller to use the existing-row fallback",
728 );
729 }
730
731 #[test]
732 fn index_prefix_cardinality_ignores_system_index_mutations() {
733 let user_index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
734 let system_index_id = IndexId::new(EntityTag::new(0xCA7D), 2);
735 let collection = b"collection-a".to_vec();
736 let draft = b"Draft".to_vec();
737 let system_component = b"reverse-edge".to_vec();
738 let mut store = IndexStore::init_heap();
739
740 store.insert(
741 indexed_raw_key(&user_index_id, vec![collection.clone(), draft.clone()], 1),
742 IndexEntryValue::presence(),
743 );
744 store.mark_prefix_cardinality_data_generation(7);
745
746 assert_eq!(
747 store.exact_prefix_cardinality(
748 7,
749 IndexKeyKind::User,
750 user_index_id,
751 &[collection.clone(), draft.clone()],
752 ),
753 Some(1),
754 );
755
756 let system_key = indexed_raw_key_with_kind(
757 &system_index_id,
758 IndexKeyKind::System,
759 vec![system_component],
760 1,
761 );
762 store.insert(system_key.clone(), IndexEntryValue::presence());
763 assert_eq!(
764 store.exact_prefix_cardinality(
765 7,
766 IndexKeyKind::User,
767 user_index_id,
768 &[collection.clone(), draft.clone()],
769 ),
770 Some(1),
771 "system index writes must not invalidate synchronized user-prefix cardinality",
772 );
773
774 store.remove(&system_key);
775 assert_eq!(
776 store.exact_prefix_cardinality(
777 7,
778 IndexKeyKind::User,
779 user_index_id,
780 &[collection.clone(), draft.clone()],
781 ),
782 Some(1),
783 "system index removals must not invalidate synchronized user-prefix cardinality",
784 );
785
786 let malformed_system_key = indexed_raw_key_with_kind(
787 &system_index_id,
788 IndexKeyKind::System,
789 vec![b"malformed-reverse-edge".to_vec()],
790 2,
791 );
792 store.insert(malformed_system_key.clone(), malformed_index_entry_value());
793 assert_eq!(
794 store.exact_prefix_cardinality(
795 7,
796 IndexKeyKind::User,
797 user_index_id,
798 &[collection.clone(), draft.clone()],
799 ),
800 Some(1),
801 "malformed system index payloads must not invalidate user-prefix cardinality",
802 );
803
804 store.remove(&malformed_system_key);
805 assert_eq!(
806 store.exact_prefix_cardinality(
807 7,
808 IndexKeyKind::User,
809 user_index_id,
810 &[collection.clone(), draft],
811 ),
812 Some(1),
813 "malformed system index removals must not invalidate user-prefix cardinality",
814 );
815
816 let review = b"Review".to_vec();
817 store.insert(
818 indexed_raw_key(&user_index_id, vec![collection.clone(), review.clone()], 2),
819 IndexEntryValue::presence(),
820 );
821 assert_eq!(
822 store.exact_prefix_cardinality(
823 7,
824 IndexKeyKind::User,
825 user_index_id,
826 &[collection, review]
827 ),
828 None,
829 "user-prefix count changes must still require a fresh row-generation stamp",
830 );
831 }
832
833 #[test]
834 fn index_prefix_cardinality_ignores_missing_user_index_mutations() {
835 let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
836 let collection = b"collection-a".to_vec();
837 let draft = b"Draft".to_vec();
838 let mut store = IndexStore::init_heap();
839
840 store.insert(
841 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
842 IndexEntryValue::presence(),
843 );
844 store.mark_prefix_cardinality_data_generation(7);
845
846 let stale_key = indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2);
847 store.insert(stale_key.clone(), missing_index_entry_value());
848 assert_eq!(
849 store.exact_prefix_cardinality(
850 7,
851 IndexKeyKind::User,
852 index_id,
853 &[collection.clone(), draft.clone()],
854 ),
855 Some(1),
856 "missing user index entries must not affect synchronized prefix cardinality",
857 );
858
859 store.remove(&stale_key);
860 assert_eq!(
861 store.exact_prefix_cardinality(7, IndexKeyKind::User, index_id, &[collection, draft],),
862 Some(1),
863 "missing user index removals must not affect synchronized prefix cardinality",
864 );
865 }
866
867 #[cfg(feature = "diagnostics")]
868 #[test]
869 fn index_store_diagnostic_counters_record_gets_range_scans_and_entry_reads() {
870 let mut store = IndexStore::init_heap();
871 store.insert(raw_key(7), IndexEntryValue::presence());
872 store.insert(raw_key(9), IndexEntryValue::presence());
873
874 let gets_before = IndexStore::current_get_call_count();
875 assert_eq!(store.get(&raw_key(7)), Some(IndexEntryValue::presence()));
876 assert_eq!(store.get(&raw_key(8)), None);
877
878 assert_eq!(
879 IndexStore::current_get_call_count().saturating_sub(gets_before),
880 2,
881 "diagnostic index-store get counter should count both hit and miss reads",
882 );
883
884 let range_scans_before = IndexStore::current_range_scan_call_count();
885 let lower = Bound::Included(raw_key(7));
886 let upper = Bound::Included(raw_key(9));
887 store
888 .visit_raw_entries_in_range((&lower, &upper), Direction::Asc, |_key, _entry| Ok(false))
889 .expect("raw index range visit should succeed");
890
891 assert_eq!(
892 IndexStore::current_range_scan_call_count().saturating_sub(range_scans_before),
893 1,
894 "diagnostic index-store range-scan counter should count one range traversal probe",
895 );
896
897 let entries_before = IndexStore::current_entry_read_count();
898 store
899 .visit_entries(|_key, _entry| Ok::<_, Infallible>(IndexStoreVisit::Continue))
900 .expect("index entry visit should succeed");
901
902 assert_eq!(
903 IndexStore::current_entry_read_count().saturating_sub(entries_before),
904 2,
905 "diagnostic index-store entry counter should count yielded traversal entries",
906 );
907 }
908
909 #[test]
910 fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
911 let mut store = IndexStore::init_journaled(test_memory(93));
912 for value in [1_u8, 3, 5] {
913 store.insert(raw_key(value), IndexEntryValue::presence());
914 }
915 store
916 .fold_journaled_materialized_view()
917 .expect("canonical index seed should fold");
918
919 store.insert(raw_key(0), IndexEntryValue::presence());
920 store.insert(raw_key(4), IndexEntryValue::presence());
921 store.insert(raw_key(5), IndexEntryValue::presence());
922 store.remove(&raw_key(1));
923
924 let lower = Bound::Included(raw_key(0));
925 let upper = Bound::Included(raw_key(5));
926
927 reset_journaled_snapshot_call_count_for_tests();
928 let mut asc = Vec::new();
929 store
930 .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
931 asc.push(key.as_bytes()[0]);
932 Ok::<_, Infallible>(asc.len() == 2)
933 })
934 .expect("asc journaled index range traversal should succeed");
935 assert_eq!(asc, vec![0, 3]);
936 assert_eq!(
937 journaled_snapshot_call_count_for_tests(),
938 0,
939 "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
940 );
941
942 reset_journaled_snapshot_call_count_for_tests();
943 let mut desc = Vec::new();
944 store
945 .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
946 desc.push(key.as_bytes()[0]);
947 Ok::<_, Infallible>(desc.len() == 2)
948 })
949 .expect("desc journaled index range traversal should succeed");
950 assert_eq!(desc, vec![5, 4]);
951 assert_eq!(
952 journaled_snapshot_call_count_for_tests(),
953 0,
954 "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
955 );
956 }
957}