Skip to main content

icydb_core/db/index/
store.rs

1//! Module: index::store
2//! Responsibility: journaled-or-heap index-entry storage behind the index-store boundary.
3//! Does not own: range-scan resolution, continuation semantics, or predicate execution.
4//! Boundary: scan/executor layers depend on this storage boundary.
5
6use crate::db::{
7    direction::Direction,
8    index::{IndexEntryValue, key::RawIndexStoreKey},
9    ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
10};
11
12use candid::CandidType;
13use ic_memory::stable_structures::{
14    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
15};
16use serde::Deserialize;
17#[cfg(any(test, feature = "diagnostics"))]
18use std::cell::Cell;
19use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
20use std::ops::Bound;
21
22#[cfg(test)]
23thread_local! {
24    static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
25}
26
27#[cfg(feature = "diagnostics")]
28thread_local! {
29    static INDEX_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
30    static INDEX_STORE_ENTRY_READ_COUNT: Cell<u64> = const { Cell::new(0) };
31}
32
33#[cfg(feature = "diagnostics")]
34fn record_index_store_get_call() {
35    INDEX_STORE_GET_CALL_COUNT.with(|count| {
36        count.set(count.get().saturating_add(1));
37    });
38}
39
40#[cfg(feature = "diagnostics")]
41fn record_index_store_entry_read() {
42    INDEX_STORE_ENTRY_READ_COUNT.with(|count| {
43        count.set(count.get().saturating_add(1));
44    });
45}
46
47fn visit_index_store_entry<E>(
48    key: &RawIndexStoreKey,
49    value: &IndexEntryValue,
50    visit: &mut impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
51) -> Result<bool, E> {
52    #[cfg(feature = "diagnostics")]
53    record_index_store_entry_read();
54
55    visit(key, value)
56}
57
58#[cfg(test)]
59fn record_journaled_snapshot_call() {
60    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
61        count.set(count.get().saturating_add(1));
62    });
63}
64
65#[cfg(test)]
66fn reset_journaled_snapshot_call_count_for_tests() {
67    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
68}
69
70#[cfg(test)]
71fn journaled_snapshot_call_count_for_tests() -> u64 {
72    JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
73}
74
75//
76// IndexState
77//
78// Explicit lifecycle visibility state for one index store.
79// Visibility matters because planner-visible indexes must already be complete:
80// the index contents are fully built and query-visible for reads.
81//
82#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
83pub enum IndexState {
84    Building,
85    #[default]
86    Ready,
87    Dropping,
88}
89
90impl IndexState {
91    /// Return the stable lowercase text label for this lifecycle state.
92    #[must_use]
93    pub const fn as_str(self) -> &'static str {
94        match self {
95            Self::Building => "building",
96            Self::Ready => "ready",
97            Self::Dropping => "dropping",
98        }
99    }
100}
101
102///
103/// IndexStore
104///
105/// Thin persistence wrapper over one journaled or heap BTreeMap.
106///
107/// Invariant: callers provide already-validated `RawIndexStoreKey`/`IndexEntryValue`.
108///
109
110pub struct IndexStore {
111    pub(super) backend: IndexStoreBackend,
112    generation: u64,
113    state: IndexState,
114}
115
116pub(super) enum IndexStoreBackend {
117    Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
118    Journaled {
119        canonical:
120            StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
121        live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
122        tombstones: BTreeSet<RawIndexStoreKey>,
123    },
124}
125
126/// Control-flow result for index-store traversal visitors.
127#[derive(Clone, Copy, Debug, Eq, PartialEq)]
128pub(in crate::db) enum IndexStoreVisit {
129    Continue,
130    Stop,
131}
132
133impl IndexStoreVisit {
134    const fn should_stop(self) -> bool {
135        matches!(self, Self::Stop)
136    }
137}
138
139impl IndexStore {
140    /// Initialize a volatile heap-backed index store.
141    #[must_use]
142    pub const fn init_heap() -> Self {
143        Self {
144            backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
145            generation: 0,
146            state: IndexState::Ready,
147        }
148    }
149
150    /// Initialize a journaled cached-stable index store.
151    ///
152    /// Normal writes update only the live materialized projection. The
153    /// canonical stable index is updated by future fold/rebuild paths.
154    #[must_use]
155    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
156        Self {
157            backend: IndexStoreBackend::Journaled {
158                canonical: StableBTreeMap::init(memory),
159                live: HeapBTreeMap::new(),
160                tombstones: BTreeSet::new(),
161            },
162            generation: 0,
163            state: IndexState::Ready,
164        }
165    }
166
167    /// Visit all index entries in canonical store order without exposing the
168    /// backing stable-map iterator.
169    pub(in crate::db) fn visit_entries<E>(
170        &self,
171        mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
172    ) -> Result<(), E> {
173        match &self.backend {
174            IndexStoreBackend::Heap(map) => {
175                for (key, value) in map {
176                    #[cfg(feature = "diagnostics")]
177                    record_index_store_entry_read();
178
179                    if visitor(key, value)?.should_stop() {
180                        return Ok(());
181                    }
182                }
183            }
184            IndexStoreBackend::Journaled {
185                canonical: _,
186                live: _,
187                tombstones: _,
188            } => self.visit_journaled_entries_in_range(
189                (&Bound::Unbounded, &Bound::Unbounded),
190                Direction::Asc,
191                |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
192            )?,
193        }
194
195        Ok(())
196    }
197
198    pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
199        #[cfg(feature = "diagnostics")]
200        record_index_store_get_call();
201
202        match &self.backend {
203            IndexStoreBackend::Heap(map) => map.get(key).cloned(),
204            IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
205        }
206    }
207
208    pub fn len(&self) -> u64 {
209        match &self.backend {
210            IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
211            IndexStoreBackend::Journaled { .. } => {
212                let mut count = 0_u64;
213                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
214                    count = count.saturating_add(1);
215                    Ok(IndexStoreVisit::Continue)
216                });
217                count
218            }
219        }
220    }
221
222    pub fn is_empty(&self) -> bool {
223        match &self.backend {
224            IndexStoreBackend::Heap(map) => map.is_empty(),
225            IndexStoreBackend::Journaled { .. } => {
226                let mut empty = true;
227                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
228                    empty = false;
229                    Ok(IndexStoreVisit::Stop)
230                });
231                empty
232            }
233        }
234    }
235
236    #[must_use]
237    pub(in crate::db) const fn generation(&self) -> u64 {
238        self.generation
239    }
240
241    /// Return the explicit lifecycle state for this index store.
242    #[must_use]
243    pub(in crate::db) const fn state(&self) -> IndexState {
244        self.state
245    }
246
247    /// Mark this index store as in-progress and therefore ineligible for
248    /// planner visibility until a full authoritative rebuild ends.
249    pub(in crate::db) const fn mark_building(&mut self) {
250        self.state = IndexState::Building;
251    }
252
253    /// Mark this index store as fully built and planner-visible again.
254    pub(in crate::db) const fn mark_ready(&mut self) {
255        self.state = IndexState::Ready;
256    }
257
258    /// Mark this index store as dropping and therefore not planner-visible.
259    pub(in crate::db) const fn mark_dropping(&mut self) {
260        self.state = IndexState::Dropping;
261    }
262
263    pub(crate) fn insert(
264        &mut self,
265        key: RawIndexStoreKey,
266        entry: IndexEntryValue,
267    ) -> Option<IndexEntryValue> {
268        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
269            self.get(&key)
270        } else {
271            None
272        };
273        let previous = match &mut self.backend {
274            IndexStoreBackend::Heap(map) => map.insert(key, entry),
275            IndexStoreBackend::Journaled {
276                live, tombstones, ..
277            } => {
278                tombstones.remove(&key);
279                live.insert(key, entry);
280                previous_journaled
281            }
282        };
283        self.bump_generation();
284        previous
285    }
286
287    pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
288        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
289            self.get(key)
290        } else {
291            None
292        };
293        let previous = match &mut self.backend {
294            IndexStoreBackend::Heap(map) => map.remove(key),
295            IndexStoreBackend::Journaled {
296                live, tombstones, ..
297            } => {
298                live.remove(key);
299                tombstones.insert(key.clone());
300                previous_journaled
301            }
302        };
303        self.bump_generation();
304        previous
305    }
306
307    pub fn clear(&mut self) {
308        match &mut self.backend {
309            IndexStoreBackend::Heap(map) => map.clear(),
310            IndexStoreBackend::Journaled {
311                canonical,
312                live,
313                tombstones,
314            } => {
315                live.clear();
316                tombstones.clear();
317                for entry in canonical.iter() {
318                    tombstones.insert(entry.key().clone());
319                }
320            }
321        }
322        self.bump_generation();
323    }
324
325    /// Fold the current journaled materialized index view into the canonical
326    /// stable base and clear volatile projection state.
327    pub(in crate::db) fn fold_journaled_materialized_view(
328        &mut self,
329    ) -> Result<(), crate::error::InternalError> {
330        let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
331        let IndexStoreBackend::Journaled {
332            canonical,
333            live,
334            tombstones,
335        } = &mut self.backend
336        else {
337            return Err(crate::error::InternalError::store_invariant());
338        };
339
340        canonical.clear_new();
341        for (key, value) in entries {
342            canonical.insert(key, value);
343        }
344        live.clear();
345        tombstones.clear();
346        self.bump_generation();
347
348        Ok(())
349    }
350
351    /// Sum of bytes used by all stored index entries.
352    pub fn memory_bytes(&self) -> u64 {
353        let mut bytes = 0u64;
354        let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
355            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
356            Ok(IndexStoreVisit::Continue)
357        });
358        bytes
359    }
360
361    /// Return the monotonic perf-only count of index-entry fetches seen by this process.
362    #[cfg(feature = "diagnostics")]
363    pub(in crate::db) fn current_get_call_count() -> u64 {
364        INDEX_STORE_GET_CALL_COUNT.with(Cell::get)
365    }
366
367    /// Return the monotonic perf-only count of index entries yielded by traversal.
368    #[cfg(feature = "diagnostics")]
369    pub(in crate::db) fn current_entry_read_count() -> u64 {
370        INDEX_STORE_ENTRY_READ_COUNT.with(Cell::get)
371    }
372
373    const fn bump_generation(&mut self) {
374        self.generation = self.generation.saturating_add(1);
375    }
376
377    #[cfg(test)]
378    #[must_use]
379    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
380        match &self.backend {
381            IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
382            IndexStoreBackend::Heap(_) => 0,
383        }
384    }
385
386    fn journaled_get(
387        backend: &IndexStoreBackend,
388        key: &RawIndexStoreKey,
389    ) -> Option<IndexEntryValue> {
390        let IndexStoreBackend::Journaled {
391            canonical,
392            live,
393            tombstones,
394        } = backend
395        else {
396            return None;
397        };
398
399        if tombstones.contains(key) {
400            return None;
401        }
402        live.get(key).cloned().or_else(|| canonical.get(key))
403    }
404
405    pub(super) fn journaled_entries_snapshot_for_fold(
406        backend: &IndexStoreBackend,
407    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
408        #[cfg(test)]
409        record_journaled_snapshot_call();
410
411        let IndexStoreBackend::Journaled {
412            canonical,
413            live,
414            tombstones,
415        } = backend
416        else {
417            return HeapBTreeMap::new();
418        };
419
420        let mut entries = HeapBTreeMap::new();
421        for entry in canonical.iter() {
422            let key = entry.key().clone();
423            if !tombstones.contains(&key) {
424                entries.insert(key, entry.value());
425            }
426        }
427        for (key, value) in live {
428            if !tombstones.contains(key) {
429                entries.insert(key.clone(), value.clone());
430            }
431        }
432
433        entries
434    }
435
436    pub(super) fn visit_journaled_entries_in_range<E>(
437        &self,
438        bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
439        direction: Direction,
440        mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
441    ) -> Result<(), E> {
442        let IndexStoreBackend::Journaled {
443            canonical,
444            live,
445            tombstones,
446        } = &self.backend
447        else {
448            return Ok(());
449        };
450
451        let lower = bounds.0.clone();
452        let upper = bounds.1.clone();
453        match direction {
454            Direction::Asc if canonical.is_empty() => {
455                for (key, value) in live.range((lower, upper)) {
456                    if visit_index_store_entry(key, value, &mut visit)? {
457                        return Ok(());
458                    }
459                }
460            }
461            Direction::Desc if canonical.is_empty() => {
462                for (key, value) in live.range((lower, upper)).rev() {
463                    if visit_index_store_entry(key, value, &mut visit)? {
464                        return Ok(());
465                    }
466                }
467            }
468            Direction::Asc if live.is_empty() && tombstones.is_empty() => {
469                for entry in canonical.range((lower, upper)) {
470                    if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
471                        return Ok(());
472                    }
473                }
474            }
475            Direction::Desc if live.is_empty() && tombstones.is_empty() => {
476                for entry in canonical.range((lower, upper)).rev() {
477                    if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
478                        return Ok(());
479                    }
480                }
481            }
482            Direction::Asc => {
483                visit_ordered_overlay(
484                    canonical.range((lower.clone(), upper.clone())),
485                    live.range((lower, upper)),
486                    direction,
487                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
488                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
489                    |live_entry| !tombstones.contains(live_entry.0),
490                    |entry| {
491                        let should_stop = match entry {
492                            OrderedOverlayEntry::Canonical(canonical_entry) => {
493                                visit_index_store_entry(
494                                    canonical_entry.key(),
495                                    &canonical_entry.value(),
496                                    &mut visit,
497                                )?
498                            }
499                            OrderedOverlayEntry::Live((key, value)) => {
500                                visit_index_store_entry(key, value, &mut visit)?
501                            }
502                        };
503                        Ok(if should_stop {
504                            OrderedOverlayVisit::Stop
505                        } else {
506                            OrderedOverlayVisit::Continue
507                        })
508                    },
509                )?;
510            }
511            Direction::Desc => {
512                visit_ordered_overlay(
513                    canonical.range((lower.clone(), upper.clone())).rev(),
514                    live.range((lower, upper)).rev(),
515                    direction,
516                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
517                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
518                    |live_entry| !tombstones.contains(live_entry.0),
519                    |entry| {
520                        let should_stop = match entry {
521                            OrderedOverlayEntry::Canonical(canonical_entry) => {
522                                visit_index_store_entry(
523                                    canonical_entry.key(),
524                                    &canonical_entry.value(),
525                                    &mut visit,
526                                )?
527                            }
528                            OrderedOverlayEntry::Live((key, value)) => {
529                                visit_index_store_entry(key, value, &mut visit)?
530                            }
531                        };
532                        Ok(if should_stop {
533                            OrderedOverlayVisit::Stop
534                        } else {
535                            OrderedOverlayVisit::Continue
536                        })
537                    },
538                )?;
539            }
540        }
541
542        Ok(())
543    }
544}
545
546#[cfg(test)]
547mod tests {
548    use super::*;
549    use crate::{db::direction::Direction, testing::test_memory, traits::Storable};
550    use std::{borrow::Cow, convert::Infallible};
551
552    fn raw_key(value: u8) -> RawIndexStoreKey {
553        <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
554    }
555
556    #[cfg(feature = "diagnostics")]
557    #[test]
558    fn index_store_diagnostic_counters_record_gets_and_entry_reads() {
559        let mut store = IndexStore::init_heap();
560        store.insert(raw_key(7), IndexEntryValue::presence());
561        store.insert(raw_key(9), IndexEntryValue::presence());
562
563        let gets_before = IndexStore::current_get_call_count();
564        assert_eq!(store.get(&raw_key(7)), Some(IndexEntryValue::presence()));
565        assert_eq!(store.get(&raw_key(8)), None);
566
567        assert_eq!(
568            IndexStore::current_get_call_count().saturating_sub(gets_before),
569            2,
570            "diagnostic index-store get counter should count both hit and miss reads",
571        );
572
573        let entries_before = IndexStore::current_entry_read_count();
574        store
575            .visit_entries(|_key, _entry| Ok::<_, Infallible>(IndexStoreVisit::Continue))
576            .expect("index entry visit should succeed");
577
578        assert_eq!(
579            IndexStore::current_entry_read_count().saturating_sub(entries_before),
580            2,
581            "diagnostic index-store entry counter should count yielded traversal entries",
582        );
583    }
584
585    #[test]
586    fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
587        let mut store = IndexStore::init_journaled(test_memory(93));
588        for value in [1_u8, 3, 5] {
589            store.insert(raw_key(value), IndexEntryValue::presence());
590        }
591        store
592            .fold_journaled_materialized_view()
593            .expect("canonical index seed should fold");
594
595        store.insert(raw_key(0), IndexEntryValue::presence());
596        store.insert(raw_key(4), IndexEntryValue::presence());
597        store.insert(raw_key(5), IndexEntryValue::presence());
598        store.remove(&raw_key(1));
599
600        let lower = Bound::Included(raw_key(0));
601        let upper = Bound::Included(raw_key(5));
602
603        reset_journaled_snapshot_call_count_for_tests();
604        let mut asc = Vec::new();
605        store
606            .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
607                asc.push(key.as_bytes()[0]);
608                Ok::<_, Infallible>(asc.len() == 2)
609            })
610            .expect("asc journaled index range traversal should succeed");
611        assert_eq!(asc, vec![0, 3]);
612        assert_eq!(
613            journaled_snapshot_call_count_for_tests(),
614            0,
615            "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
616        );
617
618        reset_journaled_snapshot_call_count_for_tests();
619        let mut desc = Vec::new();
620        store
621            .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
622                desc.push(key.as_bytes()[0]);
623                Ok::<_, Infallible>(desc.len() == 2)
624            })
625            .expect("desc journaled index range traversal should succeed");
626        assert_eq!(desc, vec![5, 4]);
627        assert_eq!(
628            journaled_snapshot_call_count_for_tests(),
629            0,
630            "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
631        );
632    }
633}