Skip to main content

icydb_core/db/index/
store.rs

1//! Module: index::store
2//! Responsibility: stable-or-heap index-entry storage behind the index-store boundary.
3//! Does not own: range-scan resolution, continuation semantics, or predicate execution.
4//! Boundary: scan/executor layers depend on this storage boundary.
5
6use crate::db::{
7    direction::Direction,
8    index::{IndexEntryValue, key::RawIndexStoreKey},
9    ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
10};
11
12use candid::CandidType;
13use ic_memory::stable_structures::{
14    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
15};
16use serde::Deserialize;
17#[cfg(test)]
18use std::cell::Cell;
19use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
20use std::ops::Bound;
21
22#[cfg(test)]
23thread_local! {
24    static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
25}
26
27#[cfg(test)]
28fn record_journaled_snapshot_call() {
29    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
30        count.set(count.get().saturating_add(1));
31    });
32}
33
34#[cfg(test)]
35fn reset_journaled_snapshot_call_count_for_tests() {
36    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
37}
38
39#[cfg(test)]
40fn journaled_snapshot_call_count_for_tests() -> u64 {
41    JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
42}
43
44//
45// IndexState
46//
47// Explicit lifecycle visibility state for one index store.
48// Visibility matters because planner-visible indexes must already be complete:
49// the index contents are fully built and query-visible for reads.
50//
51#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
52pub enum IndexState {
53    Building,
54    #[default]
55    Ready,
56    Dropping,
57}
58
59impl IndexState {
60    /// Return the stable lowercase text label for this lifecycle state.
61    #[must_use]
62    pub const fn as_str(self) -> &'static str {
63        match self {
64            Self::Building => "building",
65            Self::Ready => "ready",
66            Self::Dropping => "dropping",
67        }
68    }
69}
70
71///
72/// IndexStore
73///
74/// Thin persistence wrapper over one stable or heap BTreeMap.
75///
76/// Invariant: callers provide already-validated `RawIndexStoreKey`/`IndexEntryValue`.
77///
78
79pub struct IndexStore {
80    pub(super) backend: IndexStoreBackend,
81    generation: u64,
82    state: IndexState,
83}
84
85pub(super) enum IndexStoreBackend {
86    Stable(StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>),
87    Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
88    Journaled {
89        canonical:
90            StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
91        live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
92        tombstones: BTreeSet<RawIndexStoreKey>,
93    },
94}
95
96/// Control-flow result for index-store traversal visitors.
97#[derive(Clone, Copy, Debug, Eq, PartialEq)]
98pub(in crate::db) enum IndexStoreVisit {
99    Continue,
100    Stop,
101}
102
103impl IndexStoreVisit {
104    const fn should_stop(self) -> bool {
105        matches!(self, Self::Stop)
106    }
107}
108
109impl IndexStore {
110    #[must_use]
111    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
112        Self {
113            backend: IndexStoreBackend::Stable(StableBTreeMap::init(memory)),
114            generation: 0,
115            // Existing stores default to Ready until one explicit build/drop
116            // lifecycle is introduced.
117            state: IndexState::Ready,
118        }
119    }
120
121    /// Initialize a volatile heap-backed index store.
122    #[must_use]
123    pub const fn init_heap() -> Self {
124        Self {
125            backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
126            generation: 0,
127            state: IndexState::Ready,
128        }
129    }
130
131    /// Initialize a journaled cached-stable index store.
132    ///
133    /// Normal writes update only the live materialized projection. The
134    /// canonical stable index is updated by future fold/rebuild paths.
135    #[must_use]
136    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
137        Self {
138            backend: IndexStoreBackend::Journaled {
139                canonical: StableBTreeMap::init(memory),
140                live: HeapBTreeMap::new(),
141                tombstones: BTreeSet::new(),
142            },
143            generation: 0,
144            state: IndexState::Ready,
145        }
146    }
147
148    /// Visit all index entries in canonical store order without exposing the
149    /// backing stable-map iterator.
150    pub(in crate::db) fn visit_entries<E>(
151        &self,
152        mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
153    ) -> Result<(), E> {
154        match &self.backend {
155            IndexStoreBackend::Stable(map) => {
156                for entry in map.iter() {
157                    if visitor(entry.key(), &entry.value())?.should_stop() {
158                        return Ok(());
159                    }
160                }
161            }
162            IndexStoreBackend::Heap(map) => {
163                for (key, value) in map {
164                    if visitor(key, value)?.should_stop() {
165                        return Ok(());
166                    }
167                }
168            }
169            IndexStoreBackend::Journaled {
170                canonical: _,
171                live: _,
172                tombstones: _,
173            } => self.visit_journaled_entries_in_range(
174                (&Bound::Unbounded, &Bound::Unbounded),
175                Direction::Asc,
176                |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
177            )?,
178        }
179
180        Ok(())
181    }
182
183    pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
184        match &self.backend {
185            IndexStoreBackend::Stable(map) => map.get(key),
186            IndexStoreBackend::Heap(map) => map.get(key).cloned(),
187            IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
188        }
189    }
190
191    pub fn len(&self) -> u64 {
192        match &self.backend {
193            IndexStoreBackend::Stable(map) => map.len(),
194            IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
195            IndexStoreBackend::Journaled { .. } => {
196                let mut count = 0_u64;
197                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
198                    count = count.saturating_add(1);
199                    Ok(IndexStoreVisit::Continue)
200                });
201                count
202            }
203        }
204    }
205
206    pub fn is_empty(&self) -> bool {
207        match &self.backend {
208            IndexStoreBackend::Stable(map) => map.is_empty(),
209            IndexStoreBackend::Heap(map) => map.is_empty(),
210            IndexStoreBackend::Journaled { .. } => {
211                let mut empty = true;
212                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
213                    empty = false;
214                    Ok(IndexStoreVisit::Stop)
215                });
216                empty
217            }
218        }
219    }
220
221    #[must_use]
222    pub(in crate::db) const fn generation(&self) -> u64 {
223        self.generation
224    }
225
226    /// Return the explicit lifecycle state for this index store.
227    #[must_use]
228    pub(in crate::db) const fn state(&self) -> IndexState {
229        self.state
230    }
231
232    /// Mark this index store as in-progress and therefore ineligible for
233    /// planner visibility until a full authoritative rebuild ends.
234    pub(in crate::db) const fn mark_building(&mut self) {
235        self.state = IndexState::Building;
236    }
237
238    /// Mark this index store as fully built and planner-visible again.
239    pub(in crate::db) const fn mark_ready(&mut self) {
240        self.state = IndexState::Ready;
241    }
242
243    /// Mark this index store as dropping and therefore not planner-visible.
244    pub(in crate::db) const fn mark_dropping(&mut self) {
245        self.state = IndexState::Dropping;
246    }
247
248    pub(crate) fn insert(
249        &mut self,
250        key: RawIndexStoreKey,
251        entry: IndexEntryValue,
252    ) -> Option<IndexEntryValue> {
253        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
254            self.get(&key)
255        } else {
256            None
257        };
258        let previous = match &mut self.backend {
259            IndexStoreBackend::Stable(map) => map.insert(key, entry),
260            IndexStoreBackend::Heap(map) => map.insert(key, entry),
261            IndexStoreBackend::Journaled {
262                live, tombstones, ..
263            } => {
264                tombstones.remove(&key);
265                live.insert(key, entry);
266                previous_journaled
267            }
268        };
269        self.bump_generation();
270        previous
271    }
272
273    pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
274        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
275            self.get(key)
276        } else {
277            None
278        };
279        let previous = match &mut self.backend {
280            IndexStoreBackend::Stable(map) => map.remove(key),
281            IndexStoreBackend::Heap(map) => map.remove(key),
282            IndexStoreBackend::Journaled {
283                live, tombstones, ..
284            } => {
285                live.remove(key);
286                tombstones.insert(key.clone());
287                previous_journaled
288            }
289        };
290        self.bump_generation();
291        previous
292    }
293
294    pub fn clear(&mut self) {
295        match &mut self.backend {
296            IndexStoreBackend::Stable(map) => map.clear_new(),
297            IndexStoreBackend::Heap(map) => map.clear(),
298            IndexStoreBackend::Journaled {
299                canonical,
300                live,
301                tombstones,
302            } => {
303                live.clear();
304                tombstones.clear();
305                for entry in canonical.iter() {
306                    tombstones.insert(entry.key().clone());
307                }
308            }
309        }
310        self.bump_generation();
311    }
312
313    /// Fold the current journaled materialized index view into the canonical
314    /// stable base and clear volatile projection state.
315    pub(in crate::db) fn fold_journaled_materialized_view(
316        &mut self,
317    ) -> Result<(), crate::error::InternalError> {
318        let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
319        let IndexStoreBackend::Journaled {
320            canonical,
321            live,
322            tombstones,
323        } = &mut self.backend
324        else {
325            return Err(crate::error::InternalError::store_invariant(
326                "journal index fold requires a journaled index store",
327            ));
328        };
329
330        canonical.clear_new();
331        for (key, value) in entries {
332            canonical.insert(key, value);
333        }
334        live.clear();
335        tombstones.clear();
336        self.bump_generation();
337
338        Ok(())
339    }
340
341    /// Sum of bytes used by all stored index entries.
342    pub fn memory_bytes(&self) -> u64 {
343        let mut bytes = 0u64;
344        let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
345            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
346            Ok(IndexStoreVisit::Continue)
347        });
348        bytes
349    }
350
351    const fn bump_generation(&mut self) {
352        self.generation = self.generation.saturating_add(1);
353    }
354
355    #[cfg(test)]
356    #[must_use]
357    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
358        match &self.backend {
359            IndexStoreBackend::Stable(map)
360            | IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
361            IndexStoreBackend::Heap(_) => 0,
362        }
363    }
364
365    fn journaled_get(
366        backend: &IndexStoreBackend,
367        key: &RawIndexStoreKey,
368    ) -> Option<IndexEntryValue> {
369        let IndexStoreBackend::Journaled {
370            canonical,
371            live,
372            tombstones,
373        } = backend
374        else {
375            return None;
376        };
377
378        if tombstones.contains(key) {
379            return None;
380        }
381        live.get(key).cloned().or_else(|| canonical.get(key))
382    }
383
384    pub(super) fn journaled_entries_snapshot_for_fold(
385        backend: &IndexStoreBackend,
386    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
387        #[cfg(test)]
388        record_journaled_snapshot_call();
389
390        let IndexStoreBackend::Journaled {
391            canonical,
392            live,
393            tombstones,
394        } = backend
395        else {
396            return HeapBTreeMap::new();
397        };
398
399        let mut entries = HeapBTreeMap::new();
400        for entry in canonical.iter() {
401            let key = entry.key().clone();
402            if !tombstones.contains(&key) {
403                entries.insert(key, entry.value());
404            }
405        }
406        for (key, value) in live {
407            if !tombstones.contains(key) {
408                entries.insert(key.clone(), value.clone());
409            }
410        }
411
412        entries
413    }
414
415    pub(super) fn visit_journaled_entries_in_range<E>(
416        &self,
417        bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
418        direction: Direction,
419        mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
420    ) -> Result<(), E> {
421        let IndexStoreBackend::Journaled {
422            canonical,
423            live,
424            tombstones,
425        } = &self.backend
426        else {
427            return Ok(());
428        };
429
430        let lower = bounds.0.clone();
431        let upper = bounds.1.clone();
432        match direction {
433            Direction::Asc if canonical.is_empty() => {
434                for (key, value) in live.range((lower, upper)) {
435                    if visit(key, value)? {
436                        return Ok(());
437                    }
438                }
439            }
440            Direction::Desc if canonical.is_empty() => {
441                for (key, value) in live.range((lower, upper)).rev() {
442                    if visit(key, value)? {
443                        return Ok(());
444                    }
445                }
446            }
447            Direction::Asc if live.is_empty() && tombstones.is_empty() => {
448                for entry in canonical.range((lower, upper)) {
449                    if visit(entry.key(), &entry.value())? {
450                        return Ok(());
451                    }
452                }
453            }
454            Direction::Desc if live.is_empty() && tombstones.is_empty() => {
455                for entry in canonical.range((lower, upper)).rev() {
456                    if visit(entry.key(), &entry.value())? {
457                        return Ok(());
458                    }
459                }
460            }
461            Direction::Asc => {
462                visit_ordered_overlay(
463                    canonical.range((lower.clone(), upper.clone())),
464                    live.range((lower, upper)),
465                    direction,
466                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
467                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
468                    |live_entry| !tombstones.contains(live_entry.0),
469                    |entry| {
470                        let should_stop = match entry {
471                            OrderedOverlayEntry::Canonical(canonical_entry) => {
472                                visit(canonical_entry.key(), &canonical_entry.value())?
473                            }
474                            OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
475                        };
476                        Ok(if should_stop {
477                            OrderedOverlayVisit::Stop
478                        } else {
479                            OrderedOverlayVisit::Continue
480                        })
481                    },
482                )?;
483            }
484            Direction::Desc => {
485                visit_ordered_overlay(
486                    canonical.range((lower.clone(), upper.clone())).rev(),
487                    live.range((lower, upper)).rev(),
488                    direction,
489                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
490                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
491                    |live_entry| !tombstones.contains(live_entry.0),
492                    |entry| {
493                        let should_stop = match entry {
494                            OrderedOverlayEntry::Canonical(canonical_entry) => {
495                                visit(canonical_entry.key(), &canonical_entry.value())?
496                            }
497                            OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
498                        };
499                        Ok(if should_stop {
500                            OrderedOverlayVisit::Stop
501                        } else {
502                            OrderedOverlayVisit::Continue
503                        })
504                    },
505                )?;
506            }
507        }
508
509        Ok(())
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516    use crate::{db::direction::Direction, testing::test_memory, traits::Storable};
517    use std::{borrow::Cow, convert::Infallible};
518
519    fn raw_key(value: u8) -> RawIndexStoreKey {
520        <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
521    }
522
523    #[test]
524    fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
525        let mut store = IndexStore::init_journaled(test_memory(93));
526        for value in [1_u8, 3, 5] {
527            store.insert(raw_key(value), IndexEntryValue::presence());
528        }
529        store
530            .fold_journaled_materialized_view()
531            .expect("canonical index seed should fold");
532
533        store.insert(raw_key(0), IndexEntryValue::presence());
534        store.insert(raw_key(4), IndexEntryValue::presence());
535        store.insert(raw_key(5), IndexEntryValue::presence());
536        store.remove(&raw_key(1));
537
538        let lower = Bound::Included(raw_key(0));
539        let upper = Bound::Included(raw_key(5));
540
541        reset_journaled_snapshot_call_count_for_tests();
542        let mut asc = Vec::new();
543        store
544            .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
545                asc.push(key.as_bytes()[0]);
546                Ok::<_, Infallible>(asc.len() == 2)
547            })
548            .expect("asc journaled index range traversal should succeed");
549        assert_eq!(asc, vec![0, 3]);
550        assert_eq!(
551            journaled_snapshot_call_count_for_tests(),
552            0,
553            "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
554        );
555
556        reset_journaled_snapshot_call_count_for_tests();
557        let mut desc = Vec::new();
558        store
559            .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
560                desc.push(key.as_bytes()[0]);
561                Ok::<_, Infallible>(desc.len() == 2)
562            })
563            .expect("desc journaled index range traversal should succeed");
564        assert_eq!(desc, vec![5, 4]);
565        assert_eq!(
566            journaled_snapshot_call_count_for_tests(),
567            0,
568            "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
569        );
570    }
571}