Skip to main content

icydb_core/db/data/
store.rs

1//! Module: data::store
2//! Responsibility: journaled-or-heap row storage behind the data-store boundary.
3//! Does not own: key/row validation policy beyond type boundaries.
4//! Boundary: commit/executor call into this layer after prevalidation.
5
6use crate::{
7    db::{
8        data::{CanonicalRow, RawDataStoreKey, RawRow},
9        direction::Direction,
10        key_taxonomy::RawDataStoreKeyRange,
11        ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
12    },
13    types::EntityTag,
14};
15use ic_memory::stable_structures::{
16    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
17};
18#[cfg(feature = "diagnostics")]
19use std::cell::Cell;
20use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
21use std::convert::Infallible;
22use std::ops::{Bound, RangeBounds};
23
24#[cfg(feature = "diagnostics")]
25thread_local! {
26    static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
27}
28
29#[cfg(feature = "diagnostics")]
30fn record_data_store_get_call() {
31    DATA_STORE_GET_CALL_COUNT.with(|count| {
32        count.set(count.get().saturating_add(1));
33    });
34}
35
36///
37/// DataStore
38///
39/// Thin persistence wrapper over one journaled or heap BTreeMap.
40///
41/// Invariant: callers provide already-validated `RawDataStoreKey` and canonical row bytes.
42/// This type intentionally does not enforce commit-phase ordering.
43///
44
45pub struct DataStore {
46    backend: DataStoreBackend,
47    generation: u64,
48}
49
50enum DataStoreBackend {
51    Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
52    Journaled {
53        canonical: StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>,
54        live: HeapBTreeMap<RawDataStoreKey, RawRow>,
55        tombstones: BTreeSet<RawDataStoreKey>,
56    },
57}
58
59/// Control-flow result for store traversal visitors.
60#[derive(Clone, Copy, Debug, Eq, PartialEq)]
61pub(in crate::db) enum StoreVisit {
62    Continue,
63    Stop,
64}
65
66impl StoreVisit {
67    const fn should_stop(self) -> bool {
68        matches!(self, Self::Stop)
69    }
70}
71
72impl DataStore {
73    /// Initialize a volatile heap-backed data store.
74    #[must_use]
75    pub const fn init_heap() -> Self {
76        Self {
77            backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
78            generation: 0,
79        }
80    }
81
82    /// Initialize a journaled cached-stable data store.
83    ///
84    /// Normal writes update only the live projection. The canonical stable map
85    /// is the future fold target and is not mutated by this wrapper's write
86    /// methods.
87    #[must_use]
88    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
89        Self {
90            backend: DataStoreBackend::Journaled {
91                canonical: StableBTreeMap::init(memory),
92                live: HeapBTreeMap::new(),
93                tombstones: BTreeSet::new(),
94            },
95            generation: 0,
96        }
97    }
98
99    /// Insert or replace one row by raw key.
100    pub(in crate::db) fn insert(
101        &mut self,
102        key: RawDataStoreKey,
103        row: CanonicalRow,
104    ) -> Option<RawRow> {
105        let row = row.into_raw_row();
106        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
107            self.get(&key)
108        } else {
109            None
110        };
111        let previous = match &mut self.backend {
112            DataStoreBackend::Heap(map) => map.insert(key, row),
113            DataStoreBackend::Journaled {
114                live, tombstones, ..
115            } => {
116                tombstones.remove(&key);
117                live.insert(key, row);
118                previous_journaled
119            }
120        };
121        self.bump_generation();
122        previous
123    }
124
125    /// Insert one raw row directly for corruption-focused test setup only.
126    #[cfg(test)]
127    pub(in crate::db) fn insert_raw_for_test(
128        &mut self,
129        key: RawDataStoreKey,
130        row: RawRow,
131    ) -> Option<RawRow> {
132        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
133            self.get(&key)
134        } else {
135            None
136        };
137        let previous = match &mut self.backend {
138            DataStoreBackend::Heap(map) => map.insert(key, row),
139            DataStoreBackend::Journaled {
140                live, tombstones, ..
141            } => {
142                tombstones.remove(&key);
143                live.insert(key, row);
144                previous_journaled
145            }
146        };
147        self.bump_generation();
148        previous
149    }
150
151    /// Remove one row by raw key.
152    pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
153        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
154            self.get(key)
155        } else {
156            None
157        };
158        let previous = match &mut self.backend {
159            DataStoreBackend::Heap(map) => map.remove(key),
160            DataStoreBackend::Journaled {
161                live, tombstones, ..
162            } => {
163                live.remove(key);
164                tombstones.insert(key.clone());
165                previous_journaled
166            }
167        };
168        self.bump_generation();
169        previous
170    }
171
172    /// Reset the volatile projection for journaled recovery without mutating
173    /// the canonical stable base.
174    pub(in crate::db) fn reset_journaled_live_projection(
175        &mut self,
176    ) -> Result<(), crate::error::InternalError> {
177        let DataStoreBackend::Journaled {
178            live, tombstones, ..
179        } = &mut self.backend
180        else {
181            return Err(crate::error::InternalError::store_invariant());
182        };
183
184        live.clear();
185        tombstones.clear();
186        self.bump_generation();
187
188        Ok(())
189    }
190
191    /// Apply one recovered journal row put into the volatile projection.
192    pub(in crate::db) fn apply_recovered_journal_put(
193        &mut self,
194        key: RawDataStoreKey,
195        row: RawRow,
196    ) -> Result<Option<RawRow>, crate::error::InternalError> {
197        let DataStoreBackend::Journaled {
198            canonical,
199            live,
200            tombstones,
201        } = &mut self.backend
202        else {
203            return Err(crate::error::InternalError::store_invariant());
204        };
205
206        let previous = if tombstones.contains(&key) {
207            None
208        } else {
209            live.get(&key).cloned().or_else(|| canonical.get(&key))
210        };
211        tombstones.remove(&key);
212        live.insert(key, row);
213        self.bump_generation();
214
215        Ok(previous)
216    }
217
218    /// Apply one recovered journal row delete into the volatile projection.
219    pub(in crate::db) fn apply_recovered_journal_delete(
220        &mut self,
221        key: &RawDataStoreKey,
222    ) -> Result<Option<RawRow>, crate::error::InternalError> {
223        let DataStoreBackend::Journaled {
224            canonical,
225            live,
226            tombstones,
227        } = &mut self.backend
228        else {
229            return Err(crate::error::InternalError::store_invariant());
230        };
231
232        let previous = if tombstones.contains(key) {
233            None
234        } else {
235            live.get(key).cloned().or_else(|| canonical.get(key))
236        };
237        live.remove(key);
238        tombstones.insert(key.clone());
239        self.bump_generation();
240
241        Ok(previous)
242    }
243
244    /// Apply one folded journal row put into the canonical stable base.
245    pub(in crate::db) fn fold_recovered_journal_put(
246        &mut self,
247        key: RawDataStoreKey,
248        row: RawRow,
249    ) -> Result<Option<RawRow>, crate::error::InternalError> {
250        let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
251            return Err(crate::error::InternalError::store_invariant());
252        };
253
254        let previous = canonical.insert(key, row);
255        self.bump_generation();
256
257        Ok(previous)
258    }
259
260    /// Apply one folded journal row delete into the canonical stable base.
261    pub(in crate::db) fn fold_recovered_journal_delete(
262        &mut self,
263        key: &RawDataStoreKey,
264    ) -> Result<Option<RawRow>, crate::error::InternalError> {
265        let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
266            return Err(crate::error::InternalError::store_invariant());
267        };
268
269        let previous = canonical.remove(key);
270        self.bump_generation();
271
272        Ok(previous)
273    }
274
275    /// Load one row by raw key.
276    pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
277        #[cfg(feature = "diagnostics")]
278        record_data_store_get_call();
279
280        match &self.backend {
281            DataStoreBackend::Heap(map) => map.get(key).cloned(),
282            DataStoreBackend::Journaled { .. } => Self::journaled_get_raw(&self.backend, key),
283        }
284    }
285
286    /// Return whether one raw key exists without cloning the row payload.
287    #[must_use]
288    pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
289        match &self.backend {
290            DataStoreBackend::Heap(map) => map.contains_key(key),
291            DataStoreBackend::Journaled { .. } => {
292                Self::journaled_get_raw(&self.backend, key).is_some()
293            }
294        }
295    }
296
297    /// Clear all stored rows from the data store.
298    #[cfg(test)]
299    pub(in crate::db) fn clear(&mut self) {
300        match &mut self.backend {
301            DataStoreBackend::Heap(map) => map.clear(),
302            DataStoreBackend::Journaled {
303                canonical,
304                live,
305                tombstones,
306            } => {
307                canonical.clear_new();
308                live.clear();
309                tombstones.clear();
310            }
311        }
312        self.bump_generation();
313    }
314
315    /// Return the number of stored rows without exposing the backing map.
316    #[must_use]
317    pub(in crate::db) fn len(&self) -> u64 {
318        match &self.backend {
319            DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
320            DataStoreBackend::Journaled { .. } => {
321                let mut count = 0_u64;
322                let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
323                    count = count.saturating_add(1);
324                    Ok(StoreVisit::Continue)
325                });
326                count
327            }
328        }
329    }
330
331    /// Return the row-store generation used to prove index metadata freshness.
332    #[must_use]
333    pub(in crate::db) const fn generation(&self) -> u64 {
334        self.generation
335    }
336
337    /// Return whether the data store currently contains no rows.
338    #[must_use]
339    #[cfg(test)]
340    pub(in crate::db) fn is_empty(&self) -> bool {
341        match &self.backend {
342            DataStoreBackend::Heap(map) => map.is_empty(),
343            DataStoreBackend::Journaled { .. } => {
344                let mut empty = true;
345                let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
346                    empty = false;
347                    Ok(StoreVisit::Stop)
348                });
349                empty
350            }
351        }
352    }
353
354    /// Visit raw row entries in canonical storage order.
355    pub(in crate::db) fn visit_entries<E>(
356        &self,
357        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
358    ) -> Result<(), E> {
359        match &self.backend {
360            DataStoreBackend::Heap(map) => {
361                for (key, row) in map {
362                    if visitor(key, row)?.should_stop() {
363                        break;
364                    }
365                }
366            }
367            DataStoreBackend::Journaled {
368                canonical: _,
369                live: _,
370                tombstones: _,
371            } => Self::visit_journaled_entries_in_bounds(
372                &self.backend,
373                (Bound::Unbounded, Bound::Unbounded),
374                false,
375                visitor,
376            )?,
377        }
378
379        Ok(())
380    }
381
382    /// Visit raw row entries in reverse canonical storage order.
383    pub(in crate::db) fn visit_entries_rev<E>(
384        &self,
385        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
386    ) -> Result<(), E> {
387        match &self.backend {
388            DataStoreBackend::Heap(map) => {
389                for (key, row) in map.iter().rev() {
390                    if visitor(key, row)?.should_stop() {
391                        break;
392                    }
393                }
394            }
395            DataStoreBackend::Journaled {
396                canonical: _,
397                live: _,
398                tombstones: _,
399            } => Self::visit_journaled_entries_in_bounds(
400                &self.backend,
401                (Bound::Unbounded, Bound::Unbounded),
402                true,
403                visitor,
404            )?,
405        }
406
407        Ok(())
408    }
409
410    /// Visit raw row entries whose keys belong to the provided storage range.
411    pub(in crate::db) fn visit_range<E>(
412        &self,
413        key_range: impl RangeBounds<RawDataStoreKey>,
414        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
415    ) -> Result<(), E> {
416        let bounds = Self::owned_range_bounds(&key_range);
417        match &self.backend {
418            DataStoreBackend::Heap(map) => {
419                for (key, row) in map.range((bounds.0.clone(), bounds.1)) {
420                    if visitor(key, row)?.should_stop() {
421                        break;
422                    }
423                }
424            }
425            DataStoreBackend::Journaled {
426                canonical: _,
427                live: _,
428                tombstones: _,
429            } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, false, visitor)?,
430        }
431
432        Ok(())
433    }
434
435    /// Visit raw row entries in reverse order whose keys belong to the provided storage range.
436    pub(in crate::db) fn visit_range_rev<E>(
437        &self,
438        key_range: impl RangeBounds<RawDataStoreKey>,
439        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
440    ) -> Result<(), E> {
441        let bounds = Self::owned_range_bounds(&key_range);
442        match &self.backend {
443            DataStoreBackend::Heap(map) => {
444                for (key, row) in map.range((bounds.0.clone(), bounds.1)).rev() {
445                    if visitor(key, row)?.should_stop() {
446                        break;
447                    }
448                }
449            }
450            DataStoreBackend::Journaled {
451                canonical: _,
452                live: _,
453                tombstones: _,
454            } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, true, visitor)?,
455        }
456
457        Ok(())
458    }
459
460    /// Visit raw row entries for one entity using compact prefix bounds.
461    pub(in crate::db) fn visit_entity<E>(
462        &self,
463        entity: EntityTag,
464        visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
465    ) -> Result<(), E> {
466        let range = RawDataStoreKeyRange::entity_prefix(entity);
467        self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
468    }
469
470    /// Sum of bytes used by all stored rows.
471    pub(in crate::db) fn memory_bytes(&self) -> u64 {
472        // Report map footprint as key bytes + row bytes per entry.
473        let mut bytes = 0u64;
474        let _: Result<(), Infallible> = self.visit_entries(|key, row| {
475            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
476            Ok(StoreVisit::Continue)
477        });
478        bytes
479    }
480
481    const fn bump_generation(&mut self) {
482        self.generation = self.generation.saturating_add(1);
483    }
484
485    /// Return the monotonic perf-only count of stable row fetches seen by this process.
486    #[cfg(feature = "diagnostics")]
487    pub(in crate::db) fn current_get_call_count() -> u64 {
488        DATA_STORE_GET_CALL_COUNT.with(Cell::get)
489    }
490
491    #[cfg(test)]
492    #[must_use]
493    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
494        match &self.backend {
495            DataStoreBackend::Journaled { canonical: map, .. } => map.len(),
496            DataStoreBackend::Heap(_) => 0,
497        }
498    }
499
500    fn journaled_get_raw(backend: &DataStoreBackend, key: &RawDataStoreKey) -> Option<RawRow> {
501        let DataStoreBackend::Journaled {
502            canonical,
503            live,
504            tombstones,
505        } = backend
506        else {
507            return None;
508        };
509
510        if tombstones.contains(key) {
511            return None;
512        }
513        live.get(key).cloned().or_else(|| canonical.get(key))
514    }
515
516    fn owned_range_bounds(
517        key_range: &impl RangeBounds<RawDataStoreKey>,
518    ) -> (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>) {
519        let lower = match key_range.start_bound() {
520            Bound::Included(key) => Bound::Included(key.clone()),
521            Bound::Excluded(key) => Bound::Excluded(key.clone()),
522            Bound::Unbounded => Bound::Unbounded,
523        };
524        let upper = match key_range.end_bound() {
525            Bound::Included(key) => Bound::Included(key.clone()),
526            Bound::Excluded(key) => Bound::Excluded(key.clone()),
527            Bound::Unbounded => Bound::Unbounded,
528        };
529
530        (lower, upper)
531    }
532
533    fn visit_journaled_entries_in_bounds<E>(
534        backend: &DataStoreBackend,
535        bounds: (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>),
536        reverse: bool,
537        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
538    ) -> Result<(), E> {
539        let DataStoreBackend::Journaled {
540            canonical,
541            live,
542            tombstones,
543        } = backend
544        else {
545            return Ok(());
546        };
547
548        if canonical.is_empty() {
549            if reverse {
550                for (key, row) in live.range(bounds).rev() {
551                    if visitor(key, row)?.should_stop() {
552                        return Ok(());
553                    }
554                }
555            } else {
556                for (key, row) in live.range(bounds) {
557                    if visitor(key, row)?.should_stop() {
558                        return Ok(());
559                    }
560                }
561            }
562            return Ok(());
563        }
564
565        if live.is_empty() && tombstones.is_empty() {
566            if reverse {
567                for entry in canonical.range(bounds).rev() {
568                    if visitor(entry.key(), &entry.value())?.should_stop() {
569                        return Ok(());
570                    }
571                }
572            } else {
573                for entry in canonical.range(bounds) {
574                    if visitor(entry.key(), &entry.value())?.should_stop() {
575                        return Ok(());
576                    }
577                }
578            }
579            return Ok(());
580        }
581
582        match if reverse {
583            Direction::Desc
584        } else {
585            Direction::Asc
586        } {
587            Direction::Asc => visit_ordered_overlay(
588                canonical.range((bounds.0.clone(), bounds.1.clone())),
589                live.range((bounds.0, bounds.1)),
590                Direction::Asc,
591                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
592                |canonical_entry| !tombstones.contains(canonical_entry.key()),
593                |live_entry| !tombstones.contains(live_entry.0),
594                |entry| {
595                    let visit = match entry {
596                        OrderedOverlayEntry::Canonical(canonical_entry) => {
597                            visitor(canonical_entry.key(), &canonical_entry.value())?
598                        }
599                        OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
600                    };
601                    Ok(if visit.should_stop() {
602                        OrderedOverlayVisit::Stop
603                    } else {
604                        OrderedOverlayVisit::Continue
605                    })
606                },
607            ),
608            Direction::Desc => visit_ordered_overlay(
609                canonical.range((bounds.0.clone(), bounds.1.clone())).rev(),
610                live.range((bounds.0, bounds.1)).rev(),
611                Direction::Desc,
612                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
613                |canonical_entry| !tombstones.contains(canonical_entry.key()),
614                |live_entry| !tombstones.contains(live_entry.0),
615                |entry| {
616                    let visit = match entry {
617                        OrderedOverlayEntry::Canonical(canonical_entry) => {
618                            visitor(canonical_entry.key(), &canonical_entry.value())?
619                        }
620                        OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
621                    };
622                    Ok(if visit.should_stop() {
623                        OrderedOverlayVisit::Stop
624                    } else {
625                        OrderedOverlayVisit::Continue
626                    })
627                },
628            ),
629        }
630    }
631}
632
633#[cfg(test)]
634mod tests;