1use crate::db::{
7 direction::Direction,
8 index::{
9 IndexEntryValue, IndexId, IndexKeyKind, cardinality::IndexPrefixCardinality,
10 key::RawIndexStoreKey,
11 },
12 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
13};
14
15use candid::CandidType;
16use ic_memory::stable_structures::{
17 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
18};
19use serde::Deserialize;
20#[cfg(any(test, feature = "diagnostics"))]
21use std::cell::Cell;
22use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
23use std::ops::Bound;
24
25#[cfg(test)]
26thread_local! {
27 static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
28}
29
30#[cfg(feature = "diagnostics")]
31thread_local! {
32 static INDEX_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
33}
34
35#[cfg(any(test, feature = "diagnostics"))]
36thread_local! {
37 static INDEX_STORE_RANGE_SCAN_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
38 static INDEX_STORE_ENTRY_READ_COUNT: Cell<u64> = const { Cell::new(0) };
39 static INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT: Cell<u64> = const { Cell::new(0) };
40}
41
42#[cfg(feature = "diagnostics")]
43fn record_index_store_get_call() {
44 INDEX_STORE_GET_CALL_COUNT.with(|count| {
45 count.set(count.get().saturating_add(1));
46 });
47}
48
49#[cfg(any(test, feature = "diagnostics"))]
50fn record_index_store_range_scan_call() {
51 INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(|count| {
52 count.set(count.get().saturating_add(1));
53 });
54}
55
56#[cfg(any(test, feature = "diagnostics"))]
57fn record_index_store_entry_read() {
58 INDEX_STORE_ENTRY_READ_COUNT.with(|count| {
59 count.set(count.get().saturating_add(1));
60 });
61}
62
63#[cfg(any(test, feature = "diagnostics"))]
64fn record_index_store_prefix_cardinality_lookup() {
65 INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT.with(|count| {
66 count.set(count.get().saturating_add(1));
67 });
68}
69
70fn visit_index_store_entry<E>(
71 key: &RawIndexStoreKey,
72 value: &IndexEntryValue,
73 visit: &mut impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
74) -> Result<bool, E> {
75 #[cfg(any(test, feature = "diagnostics"))]
76 record_index_store_entry_read();
77
78 visit(key, value)
79}
80
81#[cfg(test)]
82fn record_journaled_snapshot_call() {
83 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
84 count.set(count.get().saturating_add(1));
85 });
86}
87
88#[cfg(test)]
89fn reset_journaled_snapshot_call_count_for_tests() {
90 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
91}
92
93#[cfg(test)]
94fn journaled_snapshot_call_count_for_tests() -> u64 {
95 JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
96}
97
98#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
106pub enum IndexState {
107 Building,
108 #[default]
109 Ready,
110 Dropping,
111}
112
113impl IndexState {
114 #[must_use]
116 pub const fn as_str(self) -> &'static str {
117 match self {
118 Self::Building => "building",
119 Self::Ready => "ready",
120 Self::Dropping => "dropping",
121 }
122 }
123}
124
125pub struct IndexStore {
134 pub(super) backend: IndexStoreBackend,
135 generation: u64,
136 state: IndexState,
137 prefix_cardinality: IndexPrefixCardinality,
138}
139
140pub(super) enum IndexStoreBackend {
141 Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
142 Journaled {
143 canonical:
144 StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
145 live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
146 tombstones: BTreeSet<RawIndexStoreKey>,
147 },
148}
149
150#[derive(Clone, Copy, Debug, Eq, PartialEq)]
152pub(in crate::db) enum IndexStoreVisit {
153 Continue,
154 Stop,
155}
156
157impl IndexStoreVisit {
158 const fn should_stop(self) -> bool {
159 matches!(self, Self::Stop)
160 }
161}
162
163impl IndexStore {
164 #[must_use]
166 pub const fn init_heap() -> Self {
167 Self {
168 backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
169 generation: 0,
170 state: IndexState::Ready,
171 prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
172 }
173 }
174
175 #[must_use]
180 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
181 let mut store = Self {
182 backend: IndexStoreBackend::Journaled {
183 canonical: StableBTreeMap::init(memory),
184 live: HeapBTreeMap::new(),
185 tombstones: BTreeSet::new(),
186 },
187 generation: 0,
188 state: IndexState::Ready,
189 prefix_cardinality: IndexPrefixCardinality::synchronized_empty(),
190 };
191 store.rebuild_prefix_cardinality_from_entries(Some(0));
192 store
193 }
194
195 pub(in crate::db) fn visit_entries<E>(
198 &self,
199 mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
200 ) -> Result<(), E> {
201 match &self.backend {
202 IndexStoreBackend::Heap(map) => {
203 for (key, value) in map {
204 #[cfg(any(test, feature = "diagnostics"))]
205 record_index_store_entry_read();
206
207 if visitor(key, value)?.should_stop() {
208 return Ok(());
209 }
210 }
211 }
212 IndexStoreBackend::Journaled {
213 canonical: _,
214 live: _,
215 tombstones: _,
216 } => self.visit_journaled_entries_in_range(
217 (&Bound::Unbounded, &Bound::Unbounded),
218 Direction::Asc,
219 |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
220 )?,
221 }
222
223 Ok(())
224 }
225
226 pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
227 #[cfg(feature = "diagnostics")]
228 record_index_store_get_call();
229
230 match &self.backend {
231 IndexStoreBackend::Heap(map) => map.get(key).cloned(),
232 IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
233 }
234 }
235
236 pub fn len(&self) -> u64 {
237 match &self.backend {
238 IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
239 IndexStoreBackend::Journaled { .. } => {
240 let mut count = 0_u64;
241 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
242 count = count.saturating_add(1);
243 Ok(IndexStoreVisit::Continue)
244 });
245 count
246 }
247 }
248 }
249
250 pub fn is_empty(&self) -> bool {
251 match &self.backend {
252 IndexStoreBackend::Heap(map) => map.is_empty(),
253 IndexStoreBackend::Journaled { .. } => {
254 let mut empty = true;
255 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
256 empty = false;
257 Ok(IndexStoreVisit::Stop)
258 });
259 empty
260 }
261 }
262 }
263
264 #[must_use]
265 pub(in crate::db) const fn generation(&self) -> u64 {
266 self.generation
267 }
268
269 #[must_use]
271 pub(in crate::db) const fn state(&self) -> IndexState {
272 self.state
273 }
274
275 #[must_use]
278 pub(in crate::db) fn exact_prefix_cardinality(
279 &self,
280 data_generation: u64,
281 key_kind: IndexKeyKind,
282 index_id: IndexId,
283 components: &[Vec<u8>],
284 ) -> Option<u64> {
285 #[cfg(any(test, feature = "diagnostics"))]
286 record_index_store_prefix_cardinality_lookup();
287
288 self.prefix_cardinality
289 .exact_count(data_generation, key_kind, index_id, components)
290 }
291
292 #[must_use]
295 pub(in crate::db) fn exact_prefix_cardinality_sum<'a>(
296 &self,
297 data_generation: u64,
298 key_kind: IndexKeyKind,
299 index_id: IndexId,
300 component_prefixes: impl IntoIterator<Item = &'a [Vec<u8>]>,
301 stop_after: Option<u64>,
302 ) -> Option<u64> {
303 #[cfg(any(test, feature = "diagnostics"))]
304 record_index_store_prefix_cardinality_lookup();
305
306 self.prefix_cardinality.exact_count_sum(
307 data_generation,
308 key_kind,
309 index_id,
310 component_prefixes,
311 stop_after,
312 )
313 }
314
315 #[must_use]
318 pub(in crate::db) fn exact_child_prefixes_for_parent_set<'a>(
319 &self,
320 data_generation: u64,
321 key_kind: IndexKeyKind,
322 index_id: IndexId,
323 parent_component_prefixes: impl IntoIterator<Item = &'a [Vec<u8>]>,
324 max_children: usize,
325 ) -> Option<Vec<Vec<Vec<u8>>>> {
326 #[cfg(any(test, feature = "diagnostics"))]
327 record_index_store_prefix_cardinality_lookup();
328
329 self.prefix_cardinality.exact_child_prefixes_for_parent_set(
330 data_generation,
331 key_kind,
332 index_id,
333 parent_component_prefixes,
334 max_children,
335 )
336 }
337
338 pub(in crate::db) const fn mark_prefix_cardinality_data_generation(&mut self, generation: u64) {
341 self.prefix_cardinality.mark_synchronized(generation);
342 }
343
344 pub(in crate::db) const fn mark_building(&mut self) {
347 self.state = IndexState::Building;
348 }
349
350 pub(in crate::db) const fn mark_ready(&mut self) {
352 self.state = IndexState::Ready;
353 }
354
355 pub(in crate::db) const fn mark_dropping(&mut self) {
357 self.state = IndexState::Dropping;
358 }
359
360 pub(crate) fn insert(
361 &mut self,
362 key: RawIndexStoreKey,
363 entry: IndexEntryValue,
364 ) -> Option<IndexEntryValue> {
365 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
366 self.get(&key)
367 } else {
368 None
369 };
370 let cardinality_key = key.clone();
371 let previous = match &mut self.backend {
372 IndexStoreBackend::Heap(map) => map.insert(key, entry.clone()),
373 IndexStoreBackend::Journaled {
374 live, tombstones, ..
375 } => {
376 tombstones.remove(&key);
377 live.insert(key, entry.clone());
378 previous_journaled
379 }
380 };
381 self.prefix_cardinality
382 .apply_insert(&cardinality_key, previous.as_ref(), &entry);
383 self.bump_generation();
384 previous
385 }
386
387 pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
388 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
389 self.get(key)
390 } else {
391 None
392 };
393 let previous = match &mut self.backend {
394 IndexStoreBackend::Heap(map) => map.remove(key),
395 IndexStoreBackend::Journaled {
396 live, tombstones, ..
397 } => {
398 live.remove(key);
399 tombstones.insert(key.clone());
400 previous_journaled
401 }
402 };
403 self.prefix_cardinality.apply_remove(key, previous.as_ref());
404 self.bump_generation();
405 previous
406 }
407
408 pub fn clear(&mut self) {
409 match &mut self.backend {
410 IndexStoreBackend::Heap(map) => map.clear(),
411 IndexStoreBackend::Journaled {
412 canonical,
413 live,
414 tombstones,
415 } => {
416 live.clear();
417 tombstones.clear();
418 for entry in canonical.iter() {
419 tombstones.insert(entry.key().clone());
420 }
421 }
422 }
423 self.prefix_cardinality.clear_unsynchronized();
424 self.bump_generation();
425 }
426
427 pub(in crate::db) fn fold_journaled_materialized_view(
430 &mut self,
431 ) -> Result<(), crate::error::InternalError> {
432 let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
433 let IndexStoreBackend::Journaled {
434 canonical,
435 live,
436 tombstones,
437 } = &mut self.backend
438 else {
439 return Err(crate::error::InternalError::store_invariant());
440 };
441
442 canonical.clear_new();
443 for (key, value) in entries {
444 canonical.insert(key, value);
445 }
446 live.clear();
447 tombstones.clear();
448 let data_generation = self.prefix_cardinality.synchronized_generation();
449 self.rebuild_prefix_cardinality_from_entries(data_generation);
450 self.bump_generation();
451
452 Ok(())
453 }
454
455 pub fn memory_bytes(&self) -> u64 {
457 let mut bytes = 0u64;
458 let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
459 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
460 Ok(IndexStoreVisit::Continue)
461 });
462 bytes
463 }
464
465 #[cfg(feature = "diagnostics")]
467 pub(in crate::db) fn current_get_call_count() -> u64 {
468 INDEX_STORE_GET_CALL_COUNT.with(Cell::get)
469 }
470
471 #[cfg(any(test, feature = "diagnostics"))]
473 pub(in crate::db) fn current_range_scan_call_count() -> u64 {
474 INDEX_STORE_RANGE_SCAN_CALL_COUNT.with(Cell::get)
475 }
476
477 #[cfg(any(test, feature = "diagnostics"))]
479 pub(in crate::db) fn current_entry_read_count() -> u64 {
480 INDEX_STORE_ENTRY_READ_COUNT.with(Cell::get)
481 }
482
483 #[cfg(test)]
485 pub(in crate::db) fn current_prefix_cardinality_lookup_count() -> u64 {
486 INDEX_STORE_PREFIX_CARDINALITY_LOOKUP_COUNT.with(Cell::get)
487 }
488
489 #[cfg(any(test, feature = "diagnostics"))]
490 pub(in crate::db::index) fn record_range_scan_call() {
491 record_index_store_range_scan_call();
492 }
493
494 const fn bump_generation(&mut self) {
495 self.generation = self.generation.saturating_add(1);
496 }
497
498 fn rebuild_prefix_cardinality_from_entries(&mut self, data_generation: Option<u64>) {
499 self.prefix_cardinality.clear_unsynchronized();
500 let entries = Self::entries_snapshot_for_cardinality(&self.backend);
501 for (key, value) in &entries {
502 self.prefix_cardinality.apply_insert(key, None, value);
503 }
504 if let Some(data_generation) = data_generation {
505 self.prefix_cardinality.mark_synchronized(data_generation);
506 }
507 }
508
509 fn entries_snapshot_for_cardinality(
510 backend: &IndexStoreBackend,
511 ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
512 match backend {
513 IndexStoreBackend::Heap(map) => map.clone(),
514 IndexStoreBackend::Journaled { .. } => {
515 Self::journaled_entries_snapshot_for_fold(backend)
516 }
517 }
518 }
519
520 #[cfg(test)]
521 #[must_use]
522 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
523 match &self.backend {
524 IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
525 IndexStoreBackend::Heap(_) => 0,
526 }
527 }
528
529 fn journaled_get(
530 backend: &IndexStoreBackend,
531 key: &RawIndexStoreKey,
532 ) -> Option<IndexEntryValue> {
533 let IndexStoreBackend::Journaled {
534 canonical,
535 live,
536 tombstones,
537 } = backend
538 else {
539 return None;
540 };
541
542 if tombstones.contains(key) {
543 return None;
544 }
545 live.get(key).cloned().or_else(|| canonical.get(key))
546 }
547
548 pub(super) fn journaled_entries_snapshot_for_fold(
549 backend: &IndexStoreBackend,
550 ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
551 #[cfg(test)]
552 record_journaled_snapshot_call();
553
554 let IndexStoreBackend::Journaled {
555 canonical,
556 live,
557 tombstones,
558 } = backend
559 else {
560 return HeapBTreeMap::new();
561 };
562
563 let mut entries = HeapBTreeMap::new();
564 for entry in canonical.iter() {
565 let key = entry.key().clone();
566 if !tombstones.contains(&key) {
567 entries.insert(key, entry.value());
568 }
569 }
570 for (key, value) in live {
571 if !tombstones.contains(key) {
572 entries.insert(key.clone(), value.clone());
573 }
574 }
575
576 entries
577 }
578
579 pub(super) fn visit_journaled_entries_in_range<E>(
580 &self,
581 bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
582 direction: Direction,
583 mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
584 ) -> Result<(), E> {
585 let IndexStoreBackend::Journaled {
586 canonical,
587 live,
588 tombstones,
589 } = &self.backend
590 else {
591 return Ok(());
592 };
593
594 let lower = bounds.0.clone();
595 let upper = bounds.1.clone();
596 match direction {
597 Direction::Asc if canonical.is_empty() => {
598 for (key, value) in live.range((lower, upper)) {
599 if visit_index_store_entry(key, value, &mut visit)? {
600 return Ok(());
601 }
602 }
603 }
604 Direction::Desc if canonical.is_empty() => {
605 for (key, value) in live.range((lower, upper)).rev() {
606 if visit_index_store_entry(key, value, &mut visit)? {
607 return Ok(());
608 }
609 }
610 }
611 Direction::Asc if live.is_empty() && tombstones.is_empty() => {
612 for entry in canonical.range((lower, upper)) {
613 if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
614 return Ok(());
615 }
616 }
617 }
618 Direction::Desc if live.is_empty() && tombstones.is_empty() => {
619 for entry in canonical.range((lower, upper)).rev() {
620 if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
621 return Ok(());
622 }
623 }
624 }
625 Direction::Asc => {
626 visit_ordered_overlay(
627 canonical.range((lower.clone(), upper.clone())),
628 live.range((lower, upper)),
629 direction,
630 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
631 |canonical_entry| !tombstones.contains(canonical_entry.key()),
632 |live_entry| !tombstones.contains(live_entry.0),
633 |entry| {
634 let should_stop = match entry {
635 OrderedOverlayEntry::Canonical(canonical_entry) => {
636 visit_index_store_entry(
637 canonical_entry.key(),
638 &canonical_entry.value(),
639 &mut visit,
640 )?
641 }
642 OrderedOverlayEntry::Live((key, value)) => {
643 visit_index_store_entry(key, value, &mut visit)?
644 }
645 };
646 Ok(if should_stop {
647 OrderedOverlayVisit::Stop
648 } else {
649 OrderedOverlayVisit::Continue
650 })
651 },
652 )?;
653 }
654 Direction::Desc => {
655 visit_ordered_overlay(
656 canonical.range((lower.clone(), upper.clone())).rev(),
657 live.range((lower, upper)).rev(),
658 direction,
659 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
660 |canonical_entry| !tombstones.contains(canonical_entry.key()),
661 |live_entry| !tombstones.contains(live_entry.0),
662 |entry| {
663 let should_stop = match entry {
664 OrderedOverlayEntry::Canonical(canonical_entry) => {
665 visit_index_store_entry(
666 canonical_entry.key(),
667 &canonical_entry.value(),
668 &mut visit,
669 )?
670 }
671 OrderedOverlayEntry::Live((key, value)) => {
672 visit_index_store_entry(key, value, &mut visit)?
673 }
674 };
675 Ok(if should_stop {
676 OrderedOverlayVisit::Stop
677 } else {
678 OrderedOverlayVisit::Continue
679 })
680 },
681 )?;
682 }
683 }
684
685 Ok(())
686 }
687}
688
689#[cfg(test)]
690mod tests {
691 use super::*;
692 use crate::{
693 db::{
694 direction::Direction,
695 index::{IndexId, IndexKey, IndexKeyKind},
696 key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
697 },
698 testing::test_memory,
699 traits::Storable,
700 types::EntityTag,
701 };
702 use std::{borrow::Cow, convert::Infallible};
703
704 fn raw_key(value: u8) -> RawIndexStoreKey {
705 <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
706 }
707
708 fn indexed_raw_key(
709 index_id: &IndexId,
710 components: Vec<Vec<u8>>,
711 primary_key: u64,
712 ) -> RawIndexStoreKey {
713 indexed_raw_key_with_kind(index_id, IndexKeyKind::User, components, primary_key)
714 }
715
716 fn indexed_raw_key_with_kind(
717 index_id: &IndexId,
718 key_kind: IndexKeyKind,
719 components: Vec<Vec<u8>>,
720 primary_key: u64,
721 ) -> RawIndexStoreKey {
722 IndexKey::new_from_components_with_primary_key_value(
723 index_id,
724 key_kind,
725 components.as_slice(),
726 &PrimaryKeyValue::from(PrimaryKeyComponent::Nat64(primary_key)),
727 )
728 .expect("test index key should build")
729 .to_raw()
730 .expect("test index key should encode")
731 }
732
733 fn malformed_index_entry_value() -> IndexEntryValue {
734 <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![0xFF]))
735 }
736
737 fn missing_index_entry_value() -> IndexEntryValue {
738 <IndexEntryValue as Storable>::from_bytes(Cow::Owned(vec![1]))
739 }
740
741 #[test]
742 fn index_prefix_cardinality_requires_explicit_data_generation_sync() {
743 let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
744 let collection = b"collection-a".to_vec();
745 let draft = b"Draft".to_vec();
746 let review = b"Review".to_vec();
747 let mut store = IndexStore::init_heap();
748
749 store.insert(
750 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
751 IndexEntryValue::presence(),
752 );
753 store.insert(
754 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2),
755 IndexEntryValue::presence(),
756 );
757 store.insert(
758 indexed_raw_key(&index_id, vec![collection.clone(), review.clone()], 3),
759 IndexEntryValue::presence(),
760 );
761
762 assert_eq!(
763 store.exact_prefix_cardinality(
764 0,
765 IndexKeyKind::User,
766 index_id,
767 std::slice::from_ref(&collection),
768 ),
769 None,
770 "raw index mutations must not be trusted until row generation sync is stamped",
771 );
772
773 store.mark_prefix_cardinality_data_generation(7);
774
775 assert_eq!(
776 store.exact_prefix_cardinality(
777 7,
778 IndexKeyKind::User,
779 index_id,
780 std::slice::from_ref(&collection),
781 ),
782 Some(3),
783 );
784 assert_eq!(
785 store.exact_prefix_cardinality(
786 7,
787 IndexKeyKind::User,
788 index_id,
789 &[collection.clone(), draft],
790 ),
791 Some(2),
792 );
793 assert_eq!(
794 store.exact_prefix_cardinality(8, IndexKeyKind::User, index_id, &[collection, review],),
795 None,
796 "row generation drift should force the caller to use the existing-row fallback",
797 );
798 }
799
800 #[test]
801 fn index_prefix_cardinality_enumerates_bounded_child_prefixes() {
802 let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
803 let collection = b"collection-a".to_vec();
804 let other_collection = b"collection-b".to_vec();
805 let draft = b"Draft".to_vec();
806 let review = b"Review".to_vec();
807 let published = b"Published".to_vec();
808 let mut store = IndexStore::init_heap();
809
810 store.insert(
811 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
812 IndexEntryValue::presence(),
813 );
814 store.insert(
815 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2),
816 IndexEntryValue::presence(),
817 );
818 store.insert(
819 indexed_raw_key(&index_id, vec![collection.clone(), review.clone()], 3),
820 IndexEntryValue::presence(),
821 );
822 store.insert(
823 indexed_raw_key(
824 &index_id,
825 vec![other_collection.clone(), published.clone()],
826 4,
827 ),
828 IndexEntryValue::presence(),
829 );
830 store.mark_prefix_cardinality_data_generation(7);
831
832 assert_eq!(
833 store.exact_child_prefixes_for_parent_set(
834 7,
835 IndexKeyKind::User,
836 index_id,
837 [std::slice::from_ref(&collection)],
838 4,
839 ),
840 Some(vec![
841 vec![collection.clone(), draft],
842 vec![collection.clone(), review],
843 ]),
844 "child-prefix enumeration should return deterministic unique children under the requested parent",
845 );
846 assert_eq!(
847 store.exact_child_prefixes_for_parent_set(
848 7,
849 IndexKeyKind::User,
850 index_id,
851 [std::slice::from_ref(&other_collection)],
852 4,
853 ),
854 Some(vec![vec![other_collection, published]]),
855 "child-prefix enumeration must stay scoped to the requested parent prefix",
856 );
857 assert_eq!(
858 store.exact_child_prefixes_for_parent_set(
859 8,
860 IndexKeyKind::User,
861 index_id,
862 [std::slice::from_ref(&collection)],
863 4,
864 ),
865 None,
866 "row generation drift should keep child-prefix expansion fail-closed",
867 );
868 assert_eq!(
869 store.exact_child_prefixes_for_parent_set(
870 7,
871 IndexKeyKind::User,
872 index_id,
873 [std::slice::from_ref(&collection)],
874 1,
875 ),
876 None,
877 "over-cap child-prefix expansion should fall back to the existing route",
878 );
879 }
880
881 #[test]
882 fn index_prefix_cardinality_batches_sparse_child_prefixes() {
883 let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
884 let collection = b"collection-a".to_vec();
885 let other_collection = b"collection-b".to_vec();
886 let missing_a = b"missing-a".to_vec();
887 let missing_b = b"missing-b".to_vec();
888 let draft = b"Draft".to_vec();
889 let review = b"Review".to_vec();
890 let published = b"Published".to_vec();
891 let mut store = IndexStore::init_heap();
892
893 store.insert(
894 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
895 IndexEntryValue::presence(),
896 );
897 store.insert(
898 indexed_raw_key(&index_id, vec![collection.clone(), review.clone()], 2),
899 IndexEntryValue::presence(),
900 );
901 store.insert(
902 indexed_raw_key(
903 &index_id,
904 vec![other_collection.clone(), published.clone()],
905 3,
906 ),
907 IndexEntryValue::presence(),
908 );
909 store.mark_prefix_cardinality_data_generation(7);
910
911 let parents = [
912 std::slice::from_ref(&missing_a),
913 std::slice::from_ref(&collection),
914 std::slice::from_ref(&missing_b),
915 std::slice::from_ref(&other_collection),
916 ];
917 assert_eq!(
918 store.exact_child_prefixes_for_parent_set(7, IndexKeyKind::User, index_id, parents, 4,),
919 Some(vec![
920 vec![collection.clone(), draft],
921 vec![collection.clone(), review],
922 vec![other_collection.clone(), published],
923 ]),
924 "batched child-prefix enumeration should skip missing sparse parents and return deterministic real children",
925 );
926 assert_eq!(
927 store.exact_child_prefixes_for_parent_set(
928 7,
929 IndexKeyKind::User,
930 index_id,
931 [
932 std::slice::from_ref(&missing_a),
933 std::slice::from_ref(&missing_b)
934 ],
935 4,
936 ),
937 Some(Vec::new()),
938 "missing-only sparse parent sets should be proven empty when cardinality is synchronized",
939 );
940 assert_eq!(
941 store.exact_child_prefixes_for_parent_set(
942 7,
943 IndexKeyKind::User,
944 index_id,
945 [
946 std::slice::from_ref(&collection),
947 std::slice::from_ref(&other_collection)
948 ],
949 2,
950 ),
951 None,
952 "over-cap sparse parent-set expansion should fail closed",
953 );
954 assert_eq!(
955 store.exact_child_prefixes_for_parent_set(
956 8,
957 IndexKeyKind::User,
958 index_id,
959 [std::slice::from_ref(&collection)],
960 4,
961 ),
962 None,
963 "generation drift should keep batched child-prefix expansion fail-closed",
964 );
965 }
966
967 #[test]
968 fn index_prefix_cardinality_ignores_system_index_mutations() {
969 let user_index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
970 let system_index_id = IndexId::new(EntityTag::new(0xCA7D), 2);
971 let collection = b"collection-a".to_vec();
972 let draft = b"Draft".to_vec();
973 let system_component = b"reverse-edge".to_vec();
974 let mut store = IndexStore::init_heap();
975
976 store.insert(
977 indexed_raw_key(&user_index_id, vec![collection.clone(), draft.clone()], 1),
978 IndexEntryValue::presence(),
979 );
980 store.mark_prefix_cardinality_data_generation(7);
981
982 assert_eq!(
983 store.exact_prefix_cardinality(
984 7,
985 IndexKeyKind::User,
986 user_index_id,
987 &[collection.clone(), draft.clone()],
988 ),
989 Some(1),
990 );
991
992 let system_key = indexed_raw_key_with_kind(
993 &system_index_id,
994 IndexKeyKind::System,
995 vec![system_component],
996 1,
997 );
998 store.insert(system_key.clone(), IndexEntryValue::presence());
999 assert_eq!(
1000 store.exact_prefix_cardinality(
1001 7,
1002 IndexKeyKind::User,
1003 user_index_id,
1004 &[collection.clone(), draft.clone()],
1005 ),
1006 Some(1),
1007 "system index writes must not invalidate synchronized user-prefix cardinality",
1008 );
1009
1010 store.remove(&system_key);
1011 assert_eq!(
1012 store.exact_prefix_cardinality(
1013 7,
1014 IndexKeyKind::User,
1015 user_index_id,
1016 &[collection.clone(), draft.clone()],
1017 ),
1018 Some(1),
1019 "system index removals must not invalidate synchronized user-prefix cardinality",
1020 );
1021
1022 let malformed_system_key = indexed_raw_key_with_kind(
1023 &system_index_id,
1024 IndexKeyKind::System,
1025 vec![b"malformed-reverse-edge".to_vec()],
1026 2,
1027 );
1028 store.insert(malformed_system_key.clone(), malformed_index_entry_value());
1029 assert_eq!(
1030 store.exact_prefix_cardinality(
1031 7,
1032 IndexKeyKind::User,
1033 user_index_id,
1034 &[collection.clone(), draft.clone()],
1035 ),
1036 Some(1),
1037 "malformed system index payloads must not invalidate user-prefix cardinality",
1038 );
1039
1040 store.remove(&malformed_system_key);
1041 assert_eq!(
1042 store.exact_prefix_cardinality(
1043 7,
1044 IndexKeyKind::User,
1045 user_index_id,
1046 &[collection.clone(), draft],
1047 ),
1048 Some(1),
1049 "malformed system index removals must not invalidate user-prefix cardinality",
1050 );
1051
1052 let review = b"Review".to_vec();
1053 store.insert(
1054 indexed_raw_key(&user_index_id, vec![collection.clone(), review.clone()], 2),
1055 IndexEntryValue::presence(),
1056 );
1057 assert_eq!(
1058 store.exact_prefix_cardinality(
1059 7,
1060 IndexKeyKind::User,
1061 user_index_id,
1062 &[collection, review]
1063 ),
1064 None,
1065 "user-prefix count changes must still require a fresh row-generation stamp",
1066 );
1067 }
1068
1069 #[test]
1070 fn index_prefix_cardinality_ignores_missing_user_index_mutations() {
1071 let index_id = IndexId::new(EntityTag::new(0xCA7D), 1);
1072 let collection = b"collection-a".to_vec();
1073 let draft = b"Draft".to_vec();
1074 let mut store = IndexStore::init_heap();
1075
1076 store.insert(
1077 indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 1),
1078 IndexEntryValue::presence(),
1079 );
1080 store.mark_prefix_cardinality_data_generation(7);
1081
1082 let stale_key = indexed_raw_key(&index_id, vec![collection.clone(), draft.clone()], 2);
1083 store.insert(stale_key.clone(), missing_index_entry_value());
1084 assert_eq!(
1085 store.exact_prefix_cardinality(
1086 7,
1087 IndexKeyKind::User,
1088 index_id,
1089 &[collection.clone(), draft.clone()],
1090 ),
1091 Some(1),
1092 "missing user index entries must not affect synchronized prefix cardinality",
1093 );
1094
1095 store.remove(&stale_key);
1096 assert_eq!(
1097 store.exact_prefix_cardinality(7, IndexKeyKind::User, index_id, &[collection, draft],),
1098 Some(1),
1099 "missing user index removals must not affect synchronized prefix cardinality",
1100 );
1101 }
1102
1103 #[cfg(feature = "diagnostics")]
1104 #[test]
1105 fn index_store_diagnostic_counters_record_gets_range_scans_and_entry_reads() {
1106 let mut store = IndexStore::init_heap();
1107 store.insert(raw_key(7), IndexEntryValue::presence());
1108 store.insert(raw_key(9), IndexEntryValue::presence());
1109
1110 let gets_before = IndexStore::current_get_call_count();
1111 assert_eq!(store.get(&raw_key(7)), Some(IndexEntryValue::presence()));
1112 assert_eq!(store.get(&raw_key(8)), None);
1113
1114 assert_eq!(
1115 IndexStore::current_get_call_count().saturating_sub(gets_before),
1116 2,
1117 "diagnostic index-store get counter should count both hit and miss reads",
1118 );
1119
1120 let range_scans_before = IndexStore::current_range_scan_call_count();
1121 let lower = Bound::Included(raw_key(7));
1122 let upper = Bound::Included(raw_key(9));
1123 store
1124 .visit_raw_entries_in_range((&lower, &upper), Direction::Asc, |_key, _entry| Ok(false))
1125 .expect("raw index range visit should succeed");
1126
1127 assert_eq!(
1128 IndexStore::current_range_scan_call_count().saturating_sub(range_scans_before),
1129 1,
1130 "diagnostic index-store range-scan counter should count one range traversal probe",
1131 );
1132
1133 let entries_before = IndexStore::current_entry_read_count();
1134 store
1135 .visit_entries(|_key, _entry| Ok::<_, Infallible>(IndexStoreVisit::Continue))
1136 .expect("index entry visit should succeed");
1137
1138 assert_eq!(
1139 IndexStore::current_entry_read_count().saturating_sub(entries_before),
1140 2,
1141 "diagnostic index-store entry counter should count yielded traversal entries",
1142 );
1143 }
1144
1145 #[test]
1146 fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
1147 let mut store = IndexStore::init_journaled(test_memory(93));
1148 for value in [1_u8, 3, 5] {
1149 store.insert(raw_key(value), IndexEntryValue::presence());
1150 }
1151 store
1152 .fold_journaled_materialized_view()
1153 .expect("canonical index seed should fold");
1154
1155 store.insert(raw_key(0), IndexEntryValue::presence());
1156 store.insert(raw_key(4), IndexEntryValue::presence());
1157 store.insert(raw_key(5), IndexEntryValue::presence());
1158 store.remove(&raw_key(1));
1159
1160 let lower = Bound::Included(raw_key(0));
1161 let upper = Bound::Included(raw_key(5));
1162
1163 reset_journaled_snapshot_call_count_for_tests();
1164 let mut asc = Vec::new();
1165 store
1166 .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
1167 asc.push(key.as_bytes()[0]);
1168 Ok::<_, Infallible>(asc.len() == 2)
1169 })
1170 .expect("asc journaled index range traversal should succeed");
1171 assert_eq!(asc, vec![0, 3]);
1172 assert_eq!(
1173 journaled_snapshot_call_count_for_tests(),
1174 0,
1175 "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
1176 );
1177
1178 reset_journaled_snapshot_call_count_for_tests();
1179 let mut desc = Vec::new();
1180 store
1181 .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
1182 desc.push(key.as_bytes()[0]);
1183 Ok::<_, Infallible>(desc.len() == 2)
1184 })
1185 .expect("desc journaled index range traversal should succeed");
1186 assert_eq!(desc, vec![5, 4]);
1187 assert_eq!(
1188 journaled_snapshot_call_count_for_tests(),
1189 0,
1190 "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
1191 );
1192 }
1193}