Skip to main content

icydb_core/db/index/
store.rs

1//! Module: index::store
2//! Responsibility: stable-or-heap index-entry storage behind the index-store boundary.
3//! Does not own: range-scan resolution, continuation semantics, or predicate execution.
4//! Boundary: scan/executor layers depend on this storage boundary.
5
6use crate::db::{
7    direction::Direction,
8    index::{IndexEntryValue, key::RawIndexStoreKey},
9    ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
10};
11
12use candid::CandidType;
13use ic_memory::stable_structures::{
14    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
15};
16use serde::Deserialize;
17#[cfg(test)]
18use std::cell::Cell;
19use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
20use std::ops::Bound;
21
22#[cfg(test)]
23thread_local! {
24    static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
25}
26
27#[cfg(test)]
28fn record_journaled_snapshot_call() {
29    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
30        count.set(count.get().saturating_add(1));
31    });
32}
33
34#[cfg(test)]
35fn reset_journaled_snapshot_call_count_for_tests() {
36    JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
37}
38
39#[cfg(test)]
40fn journaled_snapshot_call_count_for_tests() -> u64 {
41    JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
42}
43
44//
45// IndexState
46//
47// Explicit lifecycle visibility state for one index store.
48// Visibility matters because planner-visible indexes must already be complete:
49// the index contents are fully built and query-visible for reads.
50//
51#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
52pub enum IndexState {
53    Building,
54    #[default]
55    Ready,
56    Dropping,
57}
58
59impl IndexState {
60    /// Return the stable lowercase text label for this lifecycle state.
61    #[must_use]
62    pub const fn as_str(self) -> &'static str {
63        match self {
64            Self::Building => "building",
65            Self::Ready => "ready",
66            Self::Dropping => "dropping",
67        }
68    }
69}
70
71///
72/// IndexStore
73///
74/// Thin persistence wrapper over one stable or heap BTreeMap.
75///
76/// Invariant: callers provide already-validated `RawIndexStoreKey`/`IndexEntryValue`.
77///
78
79pub struct IndexStore {
80    pub(super) backend: IndexStoreBackend,
81    generation: u64,
82    state: IndexState,
83}
84
85pub(super) enum IndexStoreBackend {
86    Stable(StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>),
87    Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
88    Journaled {
89        canonical:
90            StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
91        live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
92        tombstones: BTreeSet<RawIndexStoreKey>,
93    },
94}
95
96/// Control-flow result for index-store traversal visitors.
97#[derive(Clone, Copy, Debug, Eq, PartialEq)]
98pub(in crate::db) enum IndexStoreVisit {
99    Continue,
100    Stop,
101}
102
103impl IndexStoreVisit {
104    const fn should_stop(self) -> bool {
105        matches!(self, Self::Stop)
106    }
107}
108
109impl IndexStore {
110    #[must_use]
111    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
112        Self {
113            backend: IndexStoreBackend::Stable(StableBTreeMap::init(memory)),
114            generation: 0,
115            // Existing stores default to Ready until one explicit build/drop
116            // lifecycle is introduced.
117            state: IndexState::Ready,
118        }
119    }
120
121    /// Initialize a volatile heap-backed index store.
122    #[must_use]
123    pub const fn init_heap() -> Self {
124        Self {
125            backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
126            generation: 0,
127            state: IndexState::Ready,
128        }
129    }
130
131    /// Initialize a journaled cached-stable index store.
132    ///
133    /// Normal writes update only the live materialized projection. The
134    /// canonical stable index is updated by future fold/rebuild paths.
135    #[must_use]
136    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
137        Self {
138            backend: IndexStoreBackend::Journaled {
139                canonical: StableBTreeMap::init(memory),
140                live: HeapBTreeMap::new(),
141                tombstones: BTreeSet::new(),
142            },
143            generation: 0,
144            state: IndexState::Ready,
145        }
146    }
147
148    /// Visit all index entries in canonical store order without exposing the
149    /// backing stable-map iterator.
150    pub(in crate::db) fn visit_entries<E>(
151        &self,
152        mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
153    ) -> Result<(), E> {
154        match &self.backend {
155            IndexStoreBackend::Stable(map) => {
156                for entry in map.iter() {
157                    if visitor(entry.key(), &entry.value())?.should_stop() {
158                        return Ok(());
159                    }
160                }
161            }
162            IndexStoreBackend::Heap(map) => {
163                for (key, value) in map {
164                    if visitor(key, value)?.should_stop() {
165                        return Ok(());
166                    }
167                }
168            }
169            IndexStoreBackend::Journaled {
170                canonical: _,
171                live: _,
172                tombstones: _,
173            } => self.visit_journaled_entries_in_range(
174                (&Bound::Unbounded, &Bound::Unbounded),
175                Direction::Asc,
176                |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
177            )?,
178        }
179
180        Ok(())
181    }
182
183    pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
184        match &self.backend {
185            IndexStoreBackend::Stable(map) => map.get(key),
186            IndexStoreBackend::Heap(map) => map.get(key).cloned(),
187            IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
188        }
189    }
190
191    pub fn len(&self) -> u64 {
192        match &self.backend {
193            IndexStoreBackend::Stable(map) => map.len(),
194            IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
195            IndexStoreBackend::Journaled { .. } => {
196                let mut count = 0_u64;
197                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
198                    count = count.saturating_add(1);
199                    Ok(IndexStoreVisit::Continue)
200                });
201                count
202            }
203        }
204    }
205
206    pub fn is_empty(&self) -> bool {
207        match &self.backend {
208            IndexStoreBackend::Stable(map) => map.is_empty(),
209            IndexStoreBackend::Heap(map) => map.is_empty(),
210            IndexStoreBackend::Journaled { .. } => {
211                let mut empty = true;
212                let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
213                    empty = false;
214                    Ok(IndexStoreVisit::Stop)
215                });
216                empty
217            }
218        }
219    }
220
221    #[must_use]
222    pub(in crate::db) const fn generation(&self) -> u64 {
223        self.generation
224    }
225
226    /// Return the explicit lifecycle state for this index store.
227    #[must_use]
228    pub(in crate::db) const fn state(&self) -> IndexState {
229        self.state
230    }
231
232    /// Mark this index store as in-progress and therefore ineligible for
233    /// planner visibility until a full authoritative rebuild ends.
234    pub(in crate::db) const fn mark_building(&mut self) {
235        self.state = IndexState::Building;
236    }
237
238    /// Mark this index store as fully built and planner-visible again.
239    pub(in crate::db) const fn mark_ready(&mut self) {
240        self.state = IndexState::Ready;
241    }
242
243    /// Mark this index store as dropping and therefore not planner-visible.
244    pub(in crate::db) const fn mark_dropping(&mut self) {
245        self.state = IndexState::Dropping;
246    }
247
248    pub(crate) fn insert(
249        &mut self,
250        key: RawIndexStoreKey,
251        entry: IndexEntryValue,
252    ) -> Option<IndexEntryValue> {
253        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
254            self.get(&key)
255        } else {
256            None
257        };
258        let previous = match &mut self.backend {
259            IndexStoreBackend::Stable(map) => map.insert(key, entry),
260            IndexStoreBackend::Heap(map) => map.insert(key, entry),
261            IndexStoreBackend::Journaled {
262                live, tombstones, ..
263            } => {
264                tombstones.remove(&key);
265                live.insert(key, entry);
266                previous_journaled
267            }
268        };
269        self.bump_generation();
270        previous
271    }
272
273    pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
274        let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
275            self.get(key)
276        } else {
277            None
278        };
279        let previous = match &mut self.backend {
280            IndexStoreBackend::Stable(map) => map.remove(key),
281            IndexStoreBackend::Heap(map) => map.remove(key),
282            IndexStoreBackend::Journaled {
283                live, tombstones, ..
284            } => {
285                live.remove(key);
286                tombstones.insert(key.clone());
287                previous_journaled
288            }
289        };
290        self.bump_generation();
291        previous
292    }
293
294    pub fn clear(&mut self) {
295        match &mut self.backend {
296            IndexStoreBackend::Stable(map) => map.clear_new(),
297            IndexStoreBackend::Heap(map) => map.clear(),
298            IndexStoreBackend::Journaled {
299                canonical,
300                live,
301                tombstones,
302            } => {
303                live.clear();
304                tombstones.clear();
305                for entry in canonical.iter() {
306                    tombstones.insert(entry.key().clone());
307                }
308            }
309        }
310        self.bump_generation();
311    }
312
313    /// Fold the current journaled materialized index view into the canonical
314    /// stable base and clear volatile projection state.
315    pub(in crate::db) fn fold_journaled_materialized_view(
316        &mut self,
317    ) -> Result<(), crate::error::InternalError> {
318        let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
319        let IndexStoreBackend::Journaled {
320            canonical,
321            live,
322            tombstones,
323        } = &mut self.backend
324        else {
325            return Err(crate::error::InternalError::store_invariant());
326        };
327
328        canonical.clear_new();
329        for (key, value) in entries {
330            canonical.insert(key, value);
331        }
332        live.clear();
333        tombstones.clear();
334        self.bump_generation();
335
336        Ok(())
337    }
338
339    /// Sum of bytes used by all stored index entries.
340    pub fn memory_bytes(&self) -> u64 {
341        let mut bytes = 0u64;
342        let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
343            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
344            Ok(IndexStoreVisit::Continue)
345        });
346        bytes
347    }
348
349    const fn bump_generation(&mut self) {
350        self.generation = self.generation.saturating_add(1);
351    }
352
353    #[cfg(test)]
354    #[must_use]
355    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
356        match &self.backend {
357            IndexStoreBackend::Stable(map)
358            | IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
359            IndexStoreBackend::Heap(_) => 0,
360        }
361    }
362
363    fn journaled_get(
364        backend: &IndexStoreBackend,
365        key: &RawIndexStoreKey,
366    ) -> Option<IndexEntryValue> {
367        let IndexStoreBackend::Journaled {
368            canonical,
369            live,
370            tombstones,
371        } = backend
372        else {
373            return None;
374        };
375
376        if tombstones.contains(key) {
377            return None;
378        }
379        live.get(key).cloned().or_else(|| canonical.get(key))
380    }
381
382    pub(super) fn journaled_entries_snapshot_for_fold(
383        backend: &IndexStoreBackend,
384    ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
385        #[cfg(test)]
386        record_journaled_snapshot_call();
387
388        let IndexStoreBackend::Journaled {
389            canonical,
390            live,
391            tombstones,
392        } = backend
393        else {
394            return HeapBTreeMap::new();
395        };
396
397        let mut entries = HeapBTreeMap::new();
398        for entry in canonical.iter() {
399            let key = entry.key().clone();
400            if !tombstones.contains(&key) {
401                entries.insert(key, entry.value());
402            }
403        }
404        for (key, value) in live {
405            if !tombstones.contains(key) {
406                entries.insert(key.clone(), value.clone());
407            }
408        }
409
410        entries
411    }
412
413    pub(super) fn visit_journaled_entries_in_range<E>(
414        &self,
415        bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
416        direction: Direction,
417        mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
418    ) -> Result<(), E> {
419        let IndexStoreBackend::Journaled {
420            canonical,
421            live,
422            tombstones,
423        } = &self.backend
424        else {
425            return Ok(());
426        };
427
428        let lower = bounds.0.clone();
429        let upper = bounds.1.clone();
430        match direction {
431            Direction::Asc if canonical.is_empty() => {
432                for (key, value) in live.range((lower, upper)) {
433                    if visit(key, value)? {
434                        return Ok(());
435                    }
436                }
437            }
438            Direction::Desc if canonical.is_empty() => {
439                for (key, value) in live.range((lower, upper)).rev() {
440                    if visit(key, value)? {
441                        return Ok(());
442                    }
443                }
444            }
445            Direction::Asc if live.is_empty() && tombstones.is_empty() => {
446                for entry in canonical.range((lower, upper)) {
447                    if visit(entry.key(), &entry.value())? {
448                        return Ok(());
449                    }
450                }
451            }
452            Direction::Desc if live.is_empty() && tombstones.is_empty() => {
453                for entry in canonical.range((lower, upper)).rev() {
454                    if visit(entry.key(), &entry.value())? {
455                        return Ok(());
456                    }
457                }
458            }
459            Direction::Asc => {
460                visit_ordered_overlay(
461                    canonical.range((lower.clone(), upper.clone())),
462                    live.range((lower, upper)),
463                    direction,
464                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
465                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
466                    |live_entry| !tombstones.contains(live_entry.0),
467                    |entry| {
468                        let should_stop = match entry {
469                            OrderedOverlayEntry::Canonical(canonical_entry) => {
470                                visit(canonical_entry.key(), &canonical_entry.value())?
471                            }
472                            OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
473                        };
474                        Ok(if should_stop {
475                            OrderedOverlayVisit::Stop
476                        } else {
477                            OrderedOverlayVisit::Continue
478                        })
479                    },
480                )?;
481            }
482            Direction::Desc => {
483                visit_ordered_overlay(
484                    canonical.range((lower.clone(), upper.clone())).rev(),
485                    live.range((lower, upper)).rev(),
486                    direction,
487                    |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
488                    |canonical_entry| !tombstones.contains(canonical_entry.key()),
489                    |live_entry| !tombstones.contains(live_entry.0),
490                    |entry| {
491                        let should_stop = match entry {
492                            OrderedOverlayEntry::Canonical(canonical_entry) => {
493                                visit(canonical_entry.key(), &canonical_entry.value())?
494                            }
495                            OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
496                        };
497                        Ok(if should_stop {
498                            OrderedOverlayVisit::Stop
499                        } else {
500                            OrderedOverlayVisit::Continue
501                        })
502                    },
503                )?;
504            }
505        }
506
507        Ok(())
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514    use crate::{db::direction::Direction, testing::test_memory, traits::Storable};
515    use std::{borrow::Cow, convert::Infallible};
516
517    fn raw_key(value: u8) -> RawIndexStoreKey {
518        <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
519    }
520
521    #[test]
522    fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
523        let mut store = IndexStore::init_journaled(test_memory(93));
524        for value in [1_u8, 3, 5] {
525            store.insert(raw_key(value), IndexEntryValue::presence());
526        }
527        store
528            .fold_journaled_materialized_view()
529            .expect("canonical index seed should fold");
530
531        store.insert(raw_key(0), IndexEntryValue::presence());
532        store.insert(raw_key(4), IndexEntryValue::presence());
533        store.insert(raw_key(5), IndexEntryValue::presence());
534        store.remove(&raw_key(1));
535
536        let lower = Bound::Included(raw_key(0));
537        let upper = Bound::Included(raw_key(5));
538
539        reset_journaled_snapshot_call_count_for_tests();
540        let mut asc = Vec::new();
541        store
542            .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
543                asc.push(key.as_bytes()[0]);
544                Ok::<_, Infallible>(asc.len() == 2)
545            })
546            .expect("asc journaled index range traversal should succeed");
547        assert_eq!(asc, vec![0, 3]);
548        assert_eq!(
549            journaled_snapshot_call_count_for_tests(),
550            0,
551            "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
552        );
553
554        reset_journaled_snapshot_call_count_for_tests();
555        let mut desc = Vec::new();
556        store
557            .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
558                desc.push(key.as_bytes()[0]);
559                Ok::<_, Infallible>(desc.len() == 2)
560            })
561            .expect("desc journaled index range traversal should succeed");
562        assert_eq!(desc, vec![5, 4]);
563        assert_eq!(
564            journaled_snapshot_call_count_for_tests(),
565            0,
566            "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
567        );
568    }
569}