1use crate::db::{
7 direction::Direction,
8 index::{
9 IndexEntryValue, IndexId, IndexKeyKind, cardinality::IndexPrefixCardinality,
10 key::RawIndexStoreKey,
11 },
12 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
13};
14
15use candid::CandidType;
16use ic_memory::stable_structures::{
17 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
18};
19use serde::Deserialize;
20#[cfg(any(test, feature = "diagnostics"))]
21use std::cell::Cell;
22use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
23use std::ops::Bound;
24
25#[cfg(test)]
26thread_local! {
27 static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
28}
29
30#[cfg(feature = "diagnostics")]
31thread_local! {
32 static INDEX_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
33}
34
35#[cfg(any(test, feature = "diagnostics"))]
36thread_local! {
37 static INDEX_STORE_RANGE_SCAN_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
38 static INDEX_STORE_ENTRY_READ_COUNT: Cell<u64> = const { Cell::new(0) };
39 static INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT: Cell<u64> = const { Cell::new(0) };
40}
41
42#[cfg(feature = "diagnostics")]
43fn record_index_store_get_call() {
44 INDEX_STORE_GET_CALL_COUNT.with(|count| {
45 count.set(count.get().saturating_add(1));
46 });
47}
48
49#[cfg(any(test, feature = "diagnostics"))]
50fn record_index_store_range_scan_call() {
51 INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(|count| {
52 count.set(count.get().saturating_add(1));
53 });
54}
55
56#[cfg(any(test, feature = "diagnostics"))]
57fn record_index_store_entry_read() {
58 INDEX_STORE_ENTRY_READ_COUNT.with(|count| {
59 count.set(count.get().saturating_add(1));
60 });
61}
62
63#[cfg(any(test, feature = "diagnostics"))]
64fn record_index_store_prefix_cardinality_lookup() {
65 INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT.with(|count| {
66 count.set(count.get().saturating_add(1));
67 });
68}
69
70fn visit_index_store_entry<E>(
71 key: &RawIndexStoreKey,
72 value: &IndexEntryValue,
73 visit: &mut impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
74) -> Result<bool, E> {
75 #[cfg(any(test, feature = "diagnostics"))]
76 record_index_store_entry_read();
77
78 visit(key, value)
79}
80
81#[cfg(test)]
82fn record_journaled_snapshot_call() {
83 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
84 count.set(count.get().saturating_add(1));
85 });
86}
87
88#[cfg(test)]
89fn reset_journaled_snapshot_call_count_for_tests() {
90 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
91}
92
93#[cfg(test)]
94fn journaled_snapshot_call_count_for_tests() -> u64 {
95 JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
96}
97
98#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
106pub enum IndexState {
107 Building,
108 #[default]
109 Ready,
110 Dropping,
111}
112
113impl IndexState {
114 #[must_use]
116 pub const fn as_str(self) -> &'static str {
117 match self {
118 Self::Building => "building",
119 Self::Ready => "ready",
120 Self::Dropping => "dropping",
121 }
122 }
123}
124
125pub struct IndexStore {
134 pub(super) backend: IndexStoreBackend,
135 generation: u64,
136 state: IndexState,
137 prefix_cardinality: IndexPrefixCardinality,
138}
139
140pub(super) enum IndexStoreBackend {
141 Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
142 Journaled {
143 canonical:
144 StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
145 live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
146 tombstones: BTreeSet<RawIndexStoreKey>,
147 },
148}
149
150#[derive(Clone, Copy, Debug, Eq, PartialEq)]
152pub(in crate::db) enum IndexStoreVisit {
153 Continue,
154 Stop,
155}
156
157impl IndexStoreVisit {
158 const fn should_stop(self) -> bool {
159 matches!(self, Self::Stop)
160 }
161}
162
163impl IndexStore {
164 #[must_use]
166 pub const fn init_heap() -> Self {
167 Self {
168 backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
169 generation: 0,
170 state: IndexState::Ready,
171 prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
172 }
173 }
174
175 #[must_use]
180 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
181 let mut store = Self {
182 backend: IndexStoreBackend::Journaled {
183 canonical: StableBTreeMap::init(memory),
184 live: HeapBTreeMap::new(),
185 tombstones: BTreeSet::new(),
186 },
187 generation: 0,
188 state: IndexState::Ready,
189 prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
190 };
191 store.rebuild_prefix_cardinality_from_entries(Some(0));
192 store
193 }
194
195 pub(in crate::db) fn visit_entries<E>(
198 &self,
199 mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
200 ) -> Result<(), E> {
201 match &self.backend {
202 IndexStoreBackend::Heap(map) => {
203 for (key, value) in map {
204 #[cfg(any(test, feature = "diagnostics"))]
205 record_index_store_entry_read();
206
207 if visitor(key, value)?.should_stop() {
208 return Ok(());
209 }
210 }
211 }
212 IndexStoreBackend::Journaled {
213 canonical: _,
214 live: _,
215 tombstones: _,
216 } => self.visit_journaled_entries_in_range(
217 (&Bound::Unbounded, &Bound::Unbounded),
218 Direction::Asc,
219 |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
220 )?,
221 }
222
223 Ok(())
224 }
225
226 pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
227 #[cfg(feature = "diagnostics")]
228 record_index_store_get_call();
229
230 match &self.backend {
231 IndexStoreBackend::Heap(map) => map.get(key).cloned(),
232 IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
233 }
234 }
235
236 pub fn len(&self) -> u64 {
237 match &self.backend {
238 IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
239 IndexStoreBackend::Journaled { .. } => {
240 let mut count = 0_u64;
241 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
242 count = count.saturating_add(1);
243 Ok(IndexStoreVisit::Continue)
244 });
245 count
246 }
247 }
248 }
249
250 pub fn is_empty(&self) -> bool {
251 match &self.backend {
252 IndexStoreBackend::Heap(map) => map.is_empty(),
253 IndexStoreBackend::Journaled { .. } => {
254 let mut empty = true;
255 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
256 empty = false;
257 Ok(IndexStoreVisit::Stop)
258 });
259 empty
260 }
261 }
262 }
263
264 #[must_use]
265 pub(in crate::db) const fn generation(&self) -> u64 {
266 self.generation
267 }
268
269 #[must_use]
271 pub(in crate::db) const fn state(&self) -> IndexState {
272 self.state
273 }
274
275 #[must_use]
278 pub(in crate::db) fn exact_prefix_cardinality(
279 &self,
280 data_generation: u64,
281 key_kind: IndexKeyKind,
282 index_id: IndexId,
283 components: &[Vec<u8>],
284 ) -> Option<u64> {
285 #[cfg(any(test, feature = "diagnostics"))]
286 record_index_store_prefix_cardinality_lookup();
287
288 self.prefix_cardinality
289 .exact_count(data_generation, key_kind, index_id, components)
290 }
291
292 #[must_use]
295 pub(in crate::db) fn exact_prefix_cardinality_sum<'a>(
296 &self,
297 data_generation: u64,
298 key_kind: IndexKeyKind,
299 index_id: IndexId,
300 component_prefixes: impl IntoIterator<Item = &'a [Vec<u8>]>,
301 stop_after: Option<u64>,
302 ) -> Option<u64> {
303 #[cfg(any(test, feature = "diagnostics"))]
304 record_index_store_prefix_cardinality_lookup();
305
306 self.prefix_cardinality.exact_count_sum(
307 data_generation,
308 key_kind,
309 index_id,
310 component_prefixes,
311 stop_after,
312 )
313 }
314
315 pub(in crate::db) const fn mark_prefix_cardinality_data_generation(&mut self, generation: u64) {
318 self.prefix_cardinality.mark_synchronized(generation);
319 }
320
321 pub(in crate::db) const fn mark_building(&mut self) {
324 self.state = IndexState::Building;
325 }
326
327 pub(in crate::db) const fn mark_ready(&mut self) {
329 self.state = IndexState::Ready;
330 }
331
332 pub(in crate::db) const fn mark_dropping(&mut self) {
334 self.state = IndexState::Dropping;
335 }
336
337 pub(crate) fn insert(
338 &mut self,
339 key: RawIndexStoreKey,
340 entry: IndexEntryValue,
341 ) -> Option<IndexEntryValue> {
342 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
343 self.get(&key)
344 } else {
345 None
346 };
347 let cardinality_key = key.clone();
348 let previous = match &mut self.backend {
349 IndexStoreBackend::Heap(map) => map.insert(key, entry.clone()),
350 IndexStoreBackend::Journaled {
351 live, tombstones, ..
352 } => {
353 tombstones.remove(&key);
354 live.insert(key, entry.clone());
355 previous_journaled
356 }
357 };
358 self.prefix_cardinality
359 .apply_insert(&cardinality_key, previous.as_ref(), &entry);
360 self.bump_generation();
361 previous
362 }
363
364 pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
365 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
366 self.get(key)
367 } else {
368 None
369 };
370 let previous = match &mut self.backend {
371 IndexStoreBackend::Heap(map) => map.remove(key),
372 IndexStoreBackend::Journaled {
373 live, tombstones, ..
374 } => {
375 live.remove(key);
376 tombstones.insert(key.clone());
377 previous_journaled
378 }
379 };
380 self.prefix_cardinality.apply_remove(key, previous.as_ref());
381 self.bump_generation();
382 previous
383 }
384
385 pub fn clear(&mut self) {
386 match &mut self.backend {
387 IndexStoreBackend::Heap(map) => map.clear(),
388 IndexStoreBackend::Journaled {
389 canonical,
390 live,
391 tombstones,
392 } => {
393 live.clear();
394 tombstones.clear();
395 for entry in canonical.iter() {
396 tombstones.insert(entry.key().clone());
397 }
398 }
399 }
400 self.prefix_cardinality.clear_unsynchronized();
401 self.bump_generation();
402 }
403
404 pub(in crate::db) fn fold_journaled_materialized_view(
407 &mut self,
408 ) -> Result<(), crate::error::InternalError> {
409 let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
410 let IndexStoreBackend::Journaled {
411 canonical,
412 live,
413 tombstones,
414 } = &mut self.backend
415 else {
416 return Err(crate::error::InternalError::store_invariant());
417 };
418
419 canonical.clear_new();
420 for (key, value) in entries {
421 canonical.insert(key, value);
422 }
423 live.clear();
424 tombstones.clear();
425 let data_generation = self.prefix_cardinality.synchronized_generation();
426 self.rebuild_prefix_cardinality_from_entries(data_generation);
427 self.bump_generation();
428
429 Ok(())
430 }
431
432 pub fn memory_bytes(&self) -> u64 {
434 let mut bytes = 0u64;
435 let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
436 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
437 Ok(IndexStoreVisit::Continue)
438 });
439 bytes
440 }
441
442 #[cfg(feature = "diagnostics")]
444 pub(in crate::db) fn current_get_call_count() -> u64 {
445 INDEX_STORE_GET_CALL_COUNT.with(Cell::get)
446 }
447
448 #[cfg(any(test, feature = "diagnostics"))]
450 pub(in crate::db) fn current_range_scan_call_count() -> u64 {
451 INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(Cell::get)
452 }
453
454 #[cfg(any(test, feature = "diagnostics"))]
456 pub(in crate::db) fn current_entry_read_count() -> u64 {
457 INDEX_STORE_ENTRY_READ_COUNT.with(Cell::get)
458 }
459
460 #[cfg(test)]
462 #[allow(dead_code)]
463 pub(in crate::db) fn current_prefix_cardinality_lookup_count() -> u64 {
464 INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT.with(Cell::get)
465 }
466
467 #[cfg(any(test, feature = "diagnostics"))]
468 pub(in crate::db::index) fn record_range_scan_call() {
469 record_index_store_range_scan_call();
470 }
471
472 const fn bump_generation(&mut self) {
473 self.generation = self.generation.saturating_add(1);
474 }
475
476 fn rebuild_prefix_cardinality_from_entries(&mut self, data_generation: Option<u64>) {
477 self.prefix_cardinality.clear_unsynchronized();
478 let entries = Self::entries_snapshot_for_cardinality(&self.backend);
479 for (key, value) in &entries {
480 self.prefix_cardinality.apply_insert(key, None, value);
481 }
482 if let Some(data_generation) = data_generation {
483 self.prefix_cardinality.mark_synchronized(data_generation);
484 }
485 }
486
487 fn entries_snapshot_for_cardinality(
488 backend: &IndexStoreBackend,
489 ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
490 match backend {
491 IndexStoreBackend::Heap(map) => map.clone(),
492 IndexStoreBackend::Journaled { .. } => {
493 Self::journaled_entries_snapshot_for_fold(backend)
494 }
495 }
496 }
497
498 #[cfg(test)]
499 #[must_use]
500 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
501 match &self.backend {
502 IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
503 IndexStoreBackend::Heap(_) => 0,
504 }
505 }
506
507 fn journaled_get(
508 backend: &IndexStoreBackend,
509 key: &RawIndexStoreKey,
510 ) -> Option<IndexEntryValue> {
511 let IndexStoreBackend::Journaled {
512 canonical,
513 live,
514 tombstones,
515 } = backend
516 else {
517 return None;
518 };
519
520 if tombstones.contains(key) {
521 return None;
522 }
523 live.get(key).cloned().or_else(|| canonical.get(key))
524 }
525
526 pub(super) fn journaled_entries_snapshot_for_fold(
527 backend: &IndexStoreBackend,
528 ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
529 #[cfg(test)]
530 record_journaled_snapshot_call();
531
532 let IndexStoreBackend::Journaled {
533 canonical,
534 live,
535 tombstones,
536 } = backend
537 else {
538 return HeapBTreeMap::new();
539 };
540
541 let mut entries = HeapBTreeMap::new();
542 for entry in canonical.iter() {
543 let key = entry.key().clone();
544 if !tombstones.contains(&key) {
545 entries.insert(key, entry.value());
546 }
547 }
548 for (key, value) in live {
549 if !tombstones.contains(key) {
550 entries.insert(key.clone(), value.clone());
551 }
552 }
553
554 entries
555 }
556
557 pub(super) fn visit_journaled_entries_in_range<E>(
558 &self,
559 bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
560 direction: Direction,
561 mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
562 ) -> Result<(), E> {
563 let IndexStoreBackend::Journaled {
564 canonical,
565 live,
566 tombstones,
567 } = &self.backend
568 else {
569 return Ok(());
570 };
571
572 let lower = bounds.0.clone();
573 let upper = bounds.1.clone();
574 match direction {
575 Direction::Asc if canonical.is_empty() => {
576 for (key, value) in live.range((lower, upper)) {
577 if visit_index_store_entry(key, value, &mut visit)? {
578 return Ok(());
579 }
580 }
581 }
582 Direction::Desc if canonical.is_empty() => {
583 for (key, value) in live.range((lower, upper)).rev() {
584 if visit_index_store_entry(key, value, &mut visit)? {
585 return Ok(());
586 }
587 }
588 }
589 Direction::Asc if live.is_empty() && tombstones.is_empty() => {
590 for entry in canonical.range((lower, upper)) {
591 if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
592 return Ok(());
593 }
594 }
595 }
596 Direction::Desc if live.is_empty() && tombstones.is_empty() => {
597 for entry in canonical.range((lower, upper)).rev() {
598 if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
599 return Ok(());
600 }
601 }
602 }
603 Direction::Asc => {
604 visit_ordered_overlay(
605 canonical.range((lower.clone(), upper.clone())),
606 live.range((lower, upper)),
607 direction,
608 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
609 |canonical_entry| !tombstones.contains(canonical_entry.key()),
610 |live_entry| !tombstones.contains(live_entry.0),
611 |entry| {
612 let should_stop = match entry {
613 OrderedOverlayEntry::Canonical(canonical_entry) => {
614 visit_index_store_entry(
615 canonical_entry.key(),
616 &canonical_entry.value(),
617 &mut visit,
618 )?
619 }
620 OrderedOverlayEntry::Live((key, value)) => {
621 visit_index_store_entry(key, value, &mut visit)?
622 }
623 };
624 Ok(if should_stop {
625 OrderedOverlayVisit::Stop
626 } else {
627 OrderedOverlayVisit::Continue
628 })
629 },
630 )?;
631 }
632 Direction::Desc => {
633 visit_ordered_overlay(
634 canonical.range((lower.clone(), upper.clone())).rev(),
635 live.range((lower, upper)).rev(),
636 direction,
637 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
638 |canonical_entry| !tombstones.contains(canonical_entry.key()),
639 |live_entry| !tombstones.contains(live_entry.0),
640 |entry| {
641 let should_stop = match entry {
642 OrderedOverlayEntry::Canonical(canonical_entry) => {
643 visit_index_store_entry(
644 canonical_entry.key(),
645 &canonical_entry.value(),
646 &mut visit,
647 )?
648 }
649 OrderedOverlayEntry::Live((key, value)) => {
650 visit_index_store_entry(key, value, &mut visit)?
651 }
652 };
653 Ok(if should_stop {
654 OrderedOverlayVisit::Stop
655 } else {
656 OrderedOverlayVisit::Continue
657 })
658 },
659 )?;
660 }
661 }
662
663 Ok(())
664 }
665}
666
667#[cfg(test)]
668mod tests {
669 use super::*;
670 use crate::{
671 db::{
672 direction::Direction,
673 index::{IndexId, IndexKey, IndexKeyKind},
674 key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
675 },
676 testing::test_memory,
677 traits::Storable,
678 types::EntityTag,
679 };
680 use std::{borrow::Cow, convert::Infallible};
681
682 fn raw_key(value: u8) -> RawIndexStoreKey {
683 <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
684 }
685
686 fn indexed_raw_key(
687 index_id: &IndexId,
688 components: Vec<Vec<u8>>,
689 primary_key: u64,
690 ) -> RawIndexStoreKey {
691 indexed_raw_key_with_kind(index_id, IndexKeyKind::User, components, primary_key)
692 }
693
694 fn indexed_raw_key_with_kind(
695 index_id: &IndexId,
696 key_kind: IndexKeyKind,
697 components: Vec<Vec<u8>>,
698 primary_key: u64,
699 ) -> RawIndexStoreKey {
700 IndexKey::new_from_components_with_primary_key_value(
701 index_id,
702 key_kind,
703 components.as_slice(),
704 &PrimaryKeyValue::from(PrimaryKeyComponent::Nat64(primary_key)),
705 )
706 .to_raw()
707 }
708
709 fn malformed_index_entry_value() -> IndexEntryValue {
710 <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![0xFF]))
711 }
712
713 fn missing_index_entry_value() -> IndexEntryValue {
714 <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![1]))
715 }
716
717 #[test]
718 fn index_prefix_cardinality_requires_explicit_data_generation_sync() {
719 let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
720 let collection = b"collection-a".to_vec();
721 let draft = b"Draft".to_vec();
722 let review = b"Review".to_vec();
723 let mut store = IndexStore::init_heap();
724
725 store.insert(
726 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
727 IndexEntryValue::presence(),
728 );
729 store.insert(
730 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2),
731 IndexEntryValue::presence(),
732 );
733 store.insert(
734 indexed_raw_key(&index_id, vec![collection.clone(), review.clone()], 3),
735 IndexEntryValue::presence(),
736 );
737
738 assert_eq!(
739 store.exact_prefix_cardinality(
740 0,
741 IndexKeyKind::User,
742 index_id,
743 std::slice::from_ref(&collection),
744 ),
745 None,
746 "raw index mutations must not be trusted until row generation sync is stamped",
747 );
748
749 store.mark_prefix_cardinality_data_generation(7);
750
751 assert_eq!(
752 store.exact_prefix_cardinality(
753 7,
754 IndexKeyKind::User,
755 index_id,
756 std::slice::from_ref(&collection),
757 ),
758 Some(3),
759 );
760 assert_eq!(
761 store.exact_prefix_cardinality(
762 7,
763 IndexKeyKind::User,
764 index_id,
765 &[collection.clone(), draft],
766 ),
767 Some(2),
768 );
769 assert_eq!(
770 store.exact_prefix_cardinality(8, IndexKeyKind::User, index_id, &[collection, review],),
771 None,
772 "row generation drift should force the caller to use the existing-row fallback",
773 );
774 }
775
776 #[test]
777 fn index_prefix_cardinality_ignores_system_index_mutations() {
778 let user_index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
779 let system_index_id = IndexId::new(EntityTag::new(0xCA7D), 2);
780 let collection = b"collection-a".to_vec();
781 let draft = b"Draft".to_vec();
782 let system_component = b"reverse-edge".to_vec();
783 let mut store = IndexStore::init_heap();
784
785 store.insert(
786 indexed_raw_key(&user_index_id, vec![collection.clone(), draft.clone()], 1),
787 IndexEntryValue::presence(),
788 );
789 store.mark_prefix_cardinality_data_generation(7);
790
791 assert_eq!(
792 store.exact_prefix_cardinality(
793 7,
794 IndexKeyKind::User,
795 user_index_id,
796 &[collection.clone(), draft.clone()],
797 ),
798 Some(1),
799 );
800
801 let system_key = indexed_raw_key_with_kind(
802 &system_index_id,
803 IndexKeyKind::System,
804 vec![system_component],
805 1,
806 );
807 store.insert(system_key.clone(), IndexEntryValue::presence());
808 assert_eq!(
809 store.exact_prefix_cardinality(
810 7,
811 IndexKeyKind::User,
812 user_index_id,
813 &[collection.clone(), draft.clone()],
814 ),
815 Some(1),
816 "system index writes must not invalidate synchronized user-prefix cardinality",
817 );
818
819 store.remove(&system_key);
820 assert_eq!(
821 store.exact_prefix_cardinality(
822 7,
823 IndexKeyKind::User,
824 user_index_id,
825 &[collection.clone(), draft.clone()],
826 ),
827 Some(1),
828 "system index removals must not invalidate synchronized user-prefix cardinality",
829 );
830
831 let malformed_system_key = indexed_raw_key_with_kind(
832 &system_index_id,
833 IndexKeyKind::System,
834 vec![b"malformed-reverse-edge".to_vec()],
835 2,
836 );
837 store.insert(malformed_system_key.clone(), malformed_index_entry_value());
838 assert_eq!(
839 store.exact_prefix_cardinality(
840 7,
841 IndexKeyKind::User,
842 user_index_id,
843 &[collection.clone(), draft.clone()],
844 ),
845 Some(1),
846 "malformed system index payloads must not invalidate user-prefix cardinality",
847 );
848
849 store.remove(&malformed_system_key);
850 assert_eq!(
851 store.exact_prefix_cardinality(
852 7,
853 IndexKeyKind::User,
854 user_index_id,
855 &[collection.clone(), draft],
856 ),
857 Some(1),
858 "malformed system index removals must not invalidate user-prefix cardinality",
859 );
860
861 let review = b"Review".to_vec();
862 store.insert(
863 indexed_raw_key(&user_index_id, vec![collection.clone(), review.clone()], 2),
864 IndexEntryValue::presence(),
865 );
866 assert_eq!(
867 store.exact_prefix_cardinality(
868 7,
869 IndexKeyKind::User,
870 user_index_id,
871 &[collection, review]
872 ),
873 None,
874 "user-prefix count changes must still require a fresh row-generation stamp",
875 );
876 }
877
878 #[test]
879 fn index_prefix_cardinality_ignores_missing_user_index_mutations() {
880 let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
881 let collection = b"collection-a".to_vec();
882 let draft = b"Draft".to_vec();
883 let mut store = IndexStore::init_heap();
884
885 store.insert(
886 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
887 IndexEntryValue::presence(),
888 );
889 store.mark_prefix_cardinality_data_generation(7);
890
891 let stale_key = indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2);
892 store.insert(stale_key.clone(), missing_index_entry_value());
893 assert_eq!(
894 store.exact_prefix_cardinality(
895 7,
896 IndexKeyKind::User,
897 index_id,
898 &[collection.clone(), draft.clone()],
899 ),
900 Some(1),
901 "missing user index entries must not affect synchronized prefix cardinality",
902 );
903
904 store.remove(&stale_key);
905 assert_eq!(
906 store.exact_prefix_cardinality(7, IndexKeyKind::User, index_id, &[collection, draft],),
907 Some(1),
908 "missing user index removals must not affect synchronized prefix cardinality",
909 );
910 }
911
912 #[cfg(feature = "diagnostics")]
913 #[test]
914 fn index_store_diagnostic_counters_record_gets_range_scans_and_entry_reads() {
915 let mut store = IndexStore::init_heap();
916 store.insert(raw_key(7), IndexEntryValue::presence());
917 store.insert(raw_key(9), IndexEntryValue::presence());
918
919 let gets_before = IndexStore::current_get_call_count();
920 assert_eq!(store.get(&raw_key(7)), Some(IndexEntryValue::presence()));
921 assert_eq!(store.get(&raw_key(8)), None);
922
923 assert_eq!(
924 IndexStore::current_get_call_count().saturating_sub(gets_before),
925 2,
926 "diagnostic index-store get counter should count both hit and miss reads",
927 );
928
929 let range_scans_before = IndexStore::current_range_scan_call_count();
930 let lower = Bound::Included(raw_key(7));
931 let upper = Bound::Included(raw_key(9));
932 store
933 .visit_raw_entries_in_range((&lower, &upper), Direction::Asc, |_key, _entry| Ok(false))
934 .expect("raw index range visit should succeed");
935
936 assert_eq!(
937 IndexStore::current_range_scan_call_count().saturating_sub(range_scans_before),
938 1,
939 "diagnostic index-store range-scan counter should count one range traversal probe",
940 );
941
942 let entries_before = IndexStore::current_entry_read_count();
943 store
944 .visit_entries(|_key, _entry| Ok::<_, Infallible>(IndexStoreVisit::Continue))
945 .expect("index entry visit should succeed");
946
947 assert_eq!(
948 IndexStore::current_entry_read_count().saturating_sub(entries_before),
949 2,
950 "diagnostic index-store entry counter should count yielded traversal entries",
951 );
952 }
953
954 #[test]
955 fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
956 let mut store = IndexStore::init_journaled(test_memory(93));
957 for value in [1_u8, 3, 5] {
958 store.insert(raw_key(value), IndexEntryValue::presence());
959 }
960 store
961 .fold_journaled_materialized_view()
962 .expect("canonical index seed should fold");
963
964 store.insert(raw_key(0), IndexEntryValue::presence());
965 store.insert(raw_key(4), IndexEntryValue::presence());
966 store.insert(raw_key(5), IndexEntryValue::presence());
967 store.remove(&raw_key(1));
968
969 let lower = Bound::Included(raw_key(0));
970 let upper = Bound::Included(raw_key(5));
971
972 reset_journaled_snapshot_call_count_for_tests();
973 let mut asc = Vec::new();
974 store
975 .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
976 asc.push(key.as_bytes()[0]);
977 Ok::<_, Infallible>(asc.len() == 2)
978 })
979 .expect("asc journaled index range traversal should succeed");
980 assert_eq!(asc, vec![0, 3]);
981 assert_eq!(
982 journaled_snapshot_call_count_for_tests(),
983 0,
984 "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
985 );
986
987 reset_journaled_snapshot_call_count_for_tests();
988 let mut desc = Vec::new();
989 store
990 .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
991 desc.push(key.as_bytes()[0]);
992 Ok::<_, Infallible>(desc.len() == 2)
993 })
994 .expect("desc journaled index range traversal should succeed");
995 assert_eq!(desc, vec![5, 4]);
996 assert_eq!(
997 journaled_snapshot_call_count_for_tests(),
998 0,
999 "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
1000 );
1001 }
1002}