Skip to main content

icydb_core/db/index/
store.rs

1//! Module: index::store
2//! Responsibility: journaled-or-heap index-entry storage behind the index-store boundary.
3//! Does not own: range-scan resolution, continuation semantics, or predicate execution.
4//! Boundary: scan/executor layers depend on this storage boundary.
5
6use crate::db::{
7    direction::Direction,
8    index::{IndexEntryValue, key::RawIndexStoreKey},
9    ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
10};
11
12use candid::CandidType;
13use ic_memory::stable_structures::{
14    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
15};
16use serde::Deserialize;
17#[cfg(test)]
18use std::cell::Cell;
19use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
20use std::ops::Bound;
21
22#[cfg(test)]
23thread_local! {
24    static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
25}
26
27#[cfg(test)]
28fn record_journaled_snapshot_call() {
29    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
30        count.set(count.get().saturating_add(1));
31    });
32}
33
34#[cfg(test)]
35fn reset_journaled_snapshot_call_count_for_tests() {
36    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
37}
38
39#[cfg(test)]
40fn journaled_snapshot_call_count_for_tests() -> u64 {
41    JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
42}
43
44//
45// IndexState
46//
47// Explicit lifecycle visibility state for one index store.
48// Visibility matters because planner-visible indexes must already be complete:
49// the index contents are fully built and query-visible for reads.
50//
51#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
52pub enum IndexState {
53    Building,
54    #[default]
55    Ready,
56    Dropping,
57}
58
59impl IndexState {
60    /// Return the stable lowercase text label for this lifecycle state.
61    #[must_use]
62    pub const fn as_str(self) -> &'static str {
63        match self {
64            Self::Building => "building",
65            Self::Ready => "ready",
66            Self::Dropping => "dropping",
67        }
68    }
69}
70
71///
72/// IndexStore
73///
74/// Thin persistence wrapper over one journaled or heap BTreeMap.
75///
76/// Invariant: callers provide already-validated `RawIndexStoreKey`/`IndexEntryValue`.
77///
78
79pub struct IndexStore {
80    pub(super) backend: IndexStoreBackend,
81    generation: u64,
82    state: IndexState,
83}
84
85pub(super) enum IndexStoreBackend {
86    Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
87    Journaled {
88        canonical:
89            StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
90        live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
91        tombstones: BTreeSet<RawIndexStoreKey>,
92    },
93}
94
95/// Control-flow result for index-store traversal visitors.
96#[derive(Clone, Copy, Debug, Eq, PartialEq)]
97pub(in crate::db) enum IndexStoreVisit {
98    Continue,
99    Stop,
100}
101
102impl IndexStoreVisit {
103    const fn should_stop(self) -> bool {
104        matches!(self, Self::Stop)
105    }
106}
107
108impl IndexStore {
109    /// Initialize a volatile heap-backed index store.
110    #[must_use]
111    pub const fn init_heap() -> Self {
112        Self {
113            backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
114            generation: 0,
115            state: IndexState::Ready,
116        }
117    }
118
119    /// Initialize a journaled cached-stable index store.
120    ///
121    /// Normal writes update only the live materialized projection. The
122    /// canonical stable index is updated by future fold/rebuild paths.
123    #[must_use]
124    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
125        Self {
126            backend: IndexStoreBackend::Journaled {
127                canonical: StableBTreeMap::init(memory),
128                live: HeapBTreeMap::new(),
129                tombstones: BTreeSet::new(),
130            },
131            generation: 0,
132            state: IndexState::Ready,
133        }
134    }
135
136    /// Visit all index entries in canonical store order without exposing the
137    /// backing stable-map iterator.
138    pub(in crate::db) fn visit_entries<E>(
139        &self,
140        mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
141    ) -> Result<(), E> {
142        match &self.backend {
143            IndexStoreBackend::Heap(map) => {
144                for (key, value) in map {
145                    if visitor(key, value)?.should_stop() {
146                        return Ok(());
147                    }
148                }
149            }
150            IndexStoreBackend::Journaled {
151                canonical: _,
152                live: _,
153                tombstones: _,
154            } => self.visit_journaled_entries_in_range(
155                (&Bound::Unbounded, &Bound::Unbounded),
156                Direction::Asc,
157                |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
158            )?,
159        }
160
161        Ok(())
162    }
163
164    pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
165        match &self.backend {
166            IndexStoreBackend::Heap(map) => map.get(key).cloned(),
167            IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
168        }
169    }
170
171    pub fn len(&self) -> u64 {
172        match &self.backend {
173            IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
174            IndexStoreBackend::Journaled { .. } => {
175                let mut count = 0_u64;
176                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
177                    count = count.saturating_add(1);
178                    Ok(IndexStoreVisit::Continue)
179                });
180                count
181            }
182        }
183    }
184
185    pub fn is_empty(&self) -> bool {
186        match &self.backend {
187            IndexStoreBackend::Heap(map) => map.is_empty(),
188            IndexStoreBackend::Journaled { .. } => {
189                let mut empty = true;
190                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
191                    empty = false;
192                    Ok(IndexStoreVisit::Stop)
193                });
194                empty
195            }
196        }
197    }
198
199    #[must_use]
200    pub(in crate::db) const fn generation(&self) -> u64 {
201        self.generation
202    }
203
204    /// Return the explicit lifecycle state for this index store.
205    #[must_use]
206    pub(in crate::db) const fn state(&self) -> IndexState {
207        self.state
208    }
209
210    /// Mark this index store as in-progress and therefore ineligible for
211    /// planner visibility until a full authoritative rebuild ends.
212    pub(in crate::db) const fn mark_building(&mut self) {
213        self.state = IndexState::Building;
214    }
215
216    /// Mark this index store as fully built and planner-visible again.
217    pub(in crate::db) const fn mark_ready(&mut self) {
218        self.state = IndexState::Ready;
219    }
220
221    /// Mark this index store as dropping and therefore not planner-visible.
222    pub(in crate::db) const fn mark_dropping(&mut self) {
223        self.state = IndexState::Dropping;
224    }
225
226    pub(crate) fn insert(
227        &mut self,
228        key: RawIndexStoreKey,
229        entry: IndexEntryValue,
230    ) -> Option<IndexEntryValue> {
231        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
232            self.get(&key)
233        } else {
234            None
235        };
236        let previous = match &mut self.backend {
237            IndexStoreBackend::Heap(map) => map.insert(key, entry),
238            IndexStoreBackend::Journaled {
239                live, tombstones, ..
240            } => {
241                tombstones.remove(&key);
242                live.insert(key, entry);
243                previous_journaled
244            }
245        };
246        self.bump_generation();
247        previous
248    }
249
250    pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
251        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
252            self.get(key)
253        } else {
254            None
255        };
256        let previous = match &mut self.backend {
257            IndexStoreBackend::Heap(map) => map.remove(key),
258            IndexStoreBackend::Journaled {
259                live, tombstones, ..
260            } => {
261                live.remove(key);
262                tombstones.insert(key.clone());
263                previous_journaled
264            }
265        };
266        self.bump_generation();
267        previous
268    }
269
270    pub fn clear(&mut self) {
271        match &mut self.backend {
272            IndexStoreBackend::Heap(map) => map.clear(),
273            IndexStoreBackend::Journaled {
274                canonical,
275                live,
276                tombstones,
277            } => {
278                live.clear();
279                tombstones.clear();
280                for entry in canonical.iter() {
281                    tombstones.insert(entry.key().clone());
282                }
283            }
284        }
285        self.bump_generation();
286    }
287
288    /// Fold the current journaled materialized index view into the canonical
289    /// stable base and clear volatile projection state.
290    pub(in crate::db) fn fold_journaled_materialized_view(
291        &mut self,
292    ) -> Result<(), crate::error::InternalError> {
293        let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
294        let IndexStoreBackend::Journaled {
295            canonical,
296            live,
297            tombstones,
298        } = &mut self.backend
299        else {
300            return Err(crate::error::InternalError::store_invariant());
301        };
302
303        canonical.clear_new();
304        for (key, value) in entries {
305            canonical.insert(key, value);
306        }
307        live.clear();
308        tombstones.clear();
309        self.bump_generation();
310
311        Ok(())
312    }
313
314    /// Sum of bytes used by all stored index entries.
315    pub fn memory_bytes(&self) -> u64 {
316        let mut bytes = 0u64;
317        let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
318            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
319            Ok(IndexStoreVisit::Continue)
320        });
321        bytes
322    }
323
324    const fn bump_generation(&mut self) {
325        self.generation = self.generation.saturating_add(1);
326    }
327
328    #[cfg(test)]
329    #[must_use]
330    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
331        match &self.backend {
332            IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
333            IndexStoreBackend::Heap(_) => 0,
334        }
335    }
336
337    fn journaled_get(
338        backend: &IndexStoreBackend,
339        key: &RawIndexStoreKey,
340    ) -> Option<IndexEntryValue> {
341        let IndexStoreBackend::Journaled {
342            canonical,
343            live,
344            tombstones,
345        } = backend
346        else {
347            return None;
348        };
349
350        if tombstones.contains(key) {
351            return None;
352        }
353        live.get(key).cloned().or_else(|| canonical.get(key))
354    }
355
356    pub(super) fn journaled_entries_snapshot_for_fold(
357        backend: &IndexStoreBackend,
358    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
359        #[cfg(test)]
360        record_journaled_snapshot_call();
361
362        let IndexStoreBackend::Journaled {
363            canonical,
364            live,
365            tombstones,
366        } = backend
367        else {
368            return HeapBTreeMap::new();
369        };
370
371        let mut entries = HeapBTreeMap::new();
372        for entry in canonical.iter() {
373            let key = entry.key().clone();
374            if !tombstones.contains(&key) {
375                entries.insert(key, entry.value());
376            }
377        }
378        for (key, value) in live {
379            if !tombstones.contains(key) {
380                entries.insert(key.clone(), value.clone());
381            }
382        }
383
384        entries
385    }
386
387    pub(super) fn visit_journaled_entries_in_range<E>(
388        &self,
389        bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
390        direction: Direction,
391        mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
392    ) -> Result<(), E> {
393        let IndexStoreBackend::Journaled {
394            canonical,
395            live,
396            tombstones,
397        } = &self.backend
398        else {
399            return Ok(());
400        };
401
402        let lower = bounds.0.clone();
403        let upper = bounds.1.clone();
404        match direction {
405            Direction::Asc if canonical.is_empty() => {
406                for (key, value) in live.range((lower, upper)) {
407                    if visit(key, value)? {
408                        return Ok(());
409                    }
410                }
411            }
412            Direction::Desc if canonical.is_empty() => {
413                for (key, value) in live.range((lower, upper)).rev() {
414                    if visit(key, value)? {
415                        return Ok(());
416                    }
417                }
418            }
419            Direction::Asc if live.is_empty() && tombstones.is_empty() => {
420                for entry in canonical.range((lower, upper)) {
421                    if visit(entry.key(), &entry.value())? {
422                        return Ok(());
423                    }
424                }
425            }
426            Direction::Desc if live.is_empty() && tombstones.is_empty() => {
427                for entry in canonical.range((lower, upper)).rev() {
428                    if visit(entry.key(), &entry.value())? {
429                        return Ok(());
430                    }
431                }
432            }
433            Direction::Asc => {
434                visit_ordered_overlay(
435                    canonical.range((lower.clone(), upper.clone())),
436                    live.range((lower, upper)),
437                    direction,
438                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
439                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
440                    |live_entry| !tombstones.contains(live_entry.0),
441                    |entry| {
442                        let should_stop = match entry {
443                            OrderedOverlayEntry::Canonical(canonical_entry) => {
444                                visit(canonical_entry.key(), &canonical_entry.value())?
445                            }
446                            OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
447                        };
448                        Ok(if should_stop {
449                            OrderedOverlayVisit::Stop
450                        } else {
451                            OrderedOverlayVisit::Continue
452                        })
453                    },
454                )?;
455            }
456            Direction::Desc => {
457                visit_ordered_overlay(
458                    canonical.range((lower.clone(), upper.clone())).rev(),
459                    live.range((lower, upper)).rev(),
460                    direction,
461                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
462                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
463                    |live_entry| !tombstones.contains(live_entry.0),
464                    |entry| {
465                        let should_stop = match entry {
466                            OrderedOverlayEntry::Canonical(canonical_entry) => {
467                                visit(canonical_entry.key(), &canonical_entry.value())?
468                            }
469                            OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
470                        };
471                        Ok(if should_stop {
472                            OrderedOverlayVisit::Stop
473                        } else {
474                            OrderedOverlayVisit::Continue
475                        })
476                    },
477                )?;
478            }
479        }
480
481        Ok(())
482    }
483}
484
485#[cfg(test)]
486mod tests {
487    use super::*;
488    use crate::{db::direction::Direction, testing::test_memory, traits::Storable};
489    use std::{borrow::Cow, convert::Infallible};
490
491    fn raw_key(value: u8) -> RawIndexStoreKey {
492        <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
493    }
494
495    #[test]
496    fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
497        let mut store = IndexStore::init_journaled(test_memory(93));
498        for value in [1_u8, 3, 5] {
499            store.insert(raw_key(value), IndexEntryValue::presence());
500        }
501        store
502            .fold_journaled_materialized_view()
503            .expect("canonical index seed should fold");
504
505        store.insert(raw_key(0), IndexEntryValue::presence());
506        store.insert(raw_key(4), IndexEntryValue::presence());
507        store.insert(raw_key(5), IndexEntryValue::presence());
508        store.remove(&raw_key(1));
509
510        let lower = Bound::Included(raw_key(0));
511        let upper = Bound::Included(raw_key(5));
512
513        reset_journaled_snapshot_call_count_for_tests();
514        let mut asc = Vec::new();
515        store
516            .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
517                asc.push(key.as_bytes()[0]);
518                Ok::<_, Infallible>(asc.len() == 2)
519            })
520            .expect("asc journaled index range traversal should succeed");
521        assert_eq!(asc, vec![0, 3]);
522        assert_eq!(
523            journaled_snapshot_call_count_for_tests(),
524            0,
525            "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
526        );
527
528        reset_journaled_snapshot_call_count_for_tests();
529        let mut desc = Vec::new();
530        store
531            .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
532                desc.push(key.as_bytes()[0]);
533                Ok::<_, Infallible>(desc.len() == 2)
534            })
535            .expect("desc journaled index range traversal should succeed");
536        assert_eq!(desc, vec![5, 4]);
537        assert_eq!(
538            journaled_snapshot_call_count_for_tests(),
539            0,
540            "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
541        );
542    }
543}