Skip to main content

icydb_core/db/data/
store.rs

1//! Module: data::store
2//! Responsibility: journaled-or-heap row storage behind the data-store boundary.
3//! Does not own: key/row validation policy beyond type boundaries.
4//! Boundary: commit/executor call into this layer after prevalidation.
5
6use crate::{
7    db::{
8        data::{CanonicalRow, RawDataStoreKey, RawRow},
9        direction::Direction,
10        key_taxonomy::RawDataStoreKeyRange,
11        ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
12    },
13    types::EntityTag,
14};
15use ic_memory::stable_structures::{
16    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
17};
18#[cfg(feature = "diagnostics")]
19use std::cell::Cell;
20use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
21use std::convert::Infallible;
22use std::ops::{Bound, RangeBounds};
23
24#[cfg(feature = "diagnostics")]
25thread_local! {
26    static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
27}
28
29#[cfg(feature = "diagnostics")]
30fn record_data_store_get_call() {
31    DATA_STORE_GET_CALL_COUNT.with(|count| {
32        count.set(count.get().saturating_add(1));
33    });
34}
35
36///
37/// DataStore
38///
39/// Thin persistence wrapper over one journaled or heap BTreeMap.
40///
41/// Invariant: callers provide already-validated `RawDataStoreKey` and canonical row bytes.
42/// This type intentionally does not enforce commit-phase ordering.
43///
44
45pub struct DataStore {
46    backend: DataStoreBackend,
47}
48
49enum DataStoreBackend {
50    Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
51    Journaled {
52        canonical: StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>,
53        live: HeapBTreeMap<RawDataStoreKey, RawRow>,
54        tombstones: BTreeSet<RawDataStoreKey>,
55    },
56}
57
58/// Control-flow result for store traversal visitors.
59#[derive(Clone, Copy, Debug, Eq, PartialEq)]
60pub(in crate::db) enum StoreVisit {
61    Continue,
62    Stop,
63}
64
65impl StoreVisit {
66    const fn should_stop(self) -> bool {
67        matches!(self, Self::Stop)
68    }
69}
70
71impl DataStore {
72    /// Initialize a volatile heap-backed data store.
73    #[must_use]
74    pub const fn init_heap() -> Self {
75        Self {
76            backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
77        }
78    }
79
80    /// Initialize a journaled cached-stable data store.
81    ///
82    /// Normal writes update only the live projection. The canonical stable map
83    /// is the future fold target and is not mutated by this wrapper's write
84    /// methods.
85    #[must_use]
86    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
87        Self {
88            backend: DataStoreBackend::Journaled {
89                canonical: StableBTreeMap::init(memory),
90                live: HeapBTreeMap::new(),
91                tombstones: BTreeSet::new(),
92            },
93        }
94    }
95
96    /// Insert or replace one row by raw key.
97    pub(in crate::db) fn insert(
98        &mut self,
99        key: RawDataStoreKey,
100        row: CanonicalRow,
101    ) -> Option<RawRow> {
102        let row = row.into_raw_row();
103        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
104            self.get(&key)
105        } else {
106            None
107        };
108        match &mut self.backend {
109            DataStoreBackend::Heap(map) => map.insert(key, row),
110            DataStoreBackend::Journaled {
111                live, tombstones, ..
112            } => {
113                tombstones.remove(&key);
114                live.insert(key, row);
115                previous_journaled
116            }
117        }
118    }
119
120    /// Insert one raw row directly for corruption-focused test setup only.
121    #[cfg(test)]
122    pub(in crate::db) fn insert_raw_for_test(
123        &mut self,
124        key: RawDataStoreKey,
125        row: RawRow,
126    ) -> Option<RawRow> {
127        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
128            self.get(&key)
129        } else {
130            None
131        };
132        match &mut self.backend {
133            DataStoreBackend::Heap(map) => map.insert(key, row),
134            DataStoreBackend::Journaled {
135                live, tombstones, ..
136            } => {
137                tombstones.remove(&key);
138                live.insert(key, row);
139                previous_journaled
140            }
141        }
142    }
143
144    /// Remove one row by raw key.
145    pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
146        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
147            self.get(key)
148        } else {
149            None
150        };
151        match &mut self.backend {
152            DataStoreBackend::Heap(map) => map.remove(key),
153            DataStoreBackend::Journaled {
154                live, tombstones, ..
155            } => {
156                live.remove(key);
157                tombstones.insert(key.clone());
158                previous_journaled
159            }
160        }
161    }
162
163    /// Reset the volatile projection for journaled recovery without mutating
164    /// the canonical stable base.
165    pub(in crate::db) fn reset_journaled_live_projection(
166        &mut self,
167    ) -> Result<(), crate::error::InternalError> {
168        let DataStoreBackend::Journaled {
169            live, tombstones, ..
170        } = &mut self.backend
171        else {
172            return Err(crate::error::InternalError::store_invariant());
173        };
174
175        live.clear();
176        tombstones.clear();
177
178        Ok(())
179    }
180
181    /// Apply one recovered journal row put into the volatile projection.
182    pub(in crate::db) fn apply_recovered_journal_put(
183        &mut self,
184        key: RawDataStoreKey,
185        row: RawRow,
186    ) -> Result<Option<RawRow>, crate::error::InternalError> {
187        let DataStoreBackend::Journaled {
188            canonical,
189            live,
190            tombstones,
191        } = &mut self.backend
192        else {
193            return Err(crate::error::InternalError::store_invariant());
194        };
195
196        let previous = if tombstones.contains(&key) {
197            None
198        } else {
199            live.get(&key).cloned().or_else(|| canonical.get(&key))
200        };
201        tombstones.remove(&key);
202        live.insert(key, row);
203
204        Ok(previous)
205    }
206
207    /// Apply one recovered journal row delete into the volatile projection.
208    pub(in crate::db) fn apply_recovered_journal_delete(
209        &mut self,
210        key: &RawDataStoreKey,
211    ) -> Result<Option<RawRow>, crate::error::InternalError> {
212        let DataStoreBackend::Journaled {
213            canonical,
214            live,
215            tombstones,
216        } = &mut self.backend
217        else {
218            return Err(crate::error::InternalError::store_invariant());
219        };
220
221        let previous = if tombstones.contains(key) {
222            None
223        } else {
224            live.get(key).cloned().or_else(|| canonical.get(key))
225        };
226        live.remove(key);
227        tombstones.insert(key.clone());
228
229        Ok(previous)
230    }
231
232    /// Apply one folded journal row put into the canonical stable base.
233    pub(in crate::db) fn fold_recovered_journal_put(
234        &mut self,
235        key: RawDataStoreKey,
236        row: RawRow,
237    ) -> Result<Option<RawRow>, crate::error::InternalError> {
238        let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
239            return Err(crate::error::InternalError::store_invariant());
240        };
241
242        Ok(canonical.insert(key, row))
243    }
244
245    /// Apply one folded journal row delete into the canonical stable base.
246    pub(in crate::db) fn fold_recovered_journal_delete(
247        &mut self,
248        key: &RawDataStoreKey,
249    ) -> Result<Option<RawRow>, crate::error::InternalError> {
250        let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
251            return Err(crate::error::InternalError::store_invariant());
252        };
253
254        Ok(canonical.remove(key))
255    }
256
257    /// Load one row by raw key.
258    pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
259        #[cfg(feature = "diagnostics")]
260        record_data_store_get_call();
261
262        match &self.backend {
263            DataStoreBackend::Heap(map) => map.get(key).cloned(),
264            DataStoreBackend::Journaled { .. } => Self::journaled_get_raw(&self.backend, key),
265        }
266    }
267
268    /// Return whether one raw key exists without cloning the row payload.
269    #[must_use]
270    pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
271        match &self.backend {
272            DataStoreBackend::Heap(map) => map.contains_key(key),
273            DataStoreBackend::Journaled { .. } => {
274                Self::journaled_get_raw(&self.backend, key).is_some()
275            }
276        }
277    }
278
279    /// Clear all stored rows from the data store.
280    #[cfg(test)]
281    pub(in crate::db) fn clear(&mut self) {
282        match &mut self.backend {
283            DataStoreBackend::Heap(map) => map.clear(),
284            DataStoreBackend::Journaled {
285                canonical,
286                live,
287                tombstones,
288            } => {
289                canonical.clear_new();
290                live.clear();
291                tombstones.clear();
292            }
293        }
294    }
295
296    /// Return the number of stored rows without exposing the backing map.
297    #[must_use]
298    pub(in crate::db) fn len(&self) -> u64 {
299        match &self.backend {
300            DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
301            DataStoreBackend::Journaled { .. } => {
302                let mut count = 0_u64;
303                let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
304                    count = count.saturating_add(1);
305                    Ok(StoreVisit::Continue)
306                });
307                count
308            }
309        }
310    }
311
312    /// Return whether the data store currently contains no rows.
313    #[must_use]
314    #[cfg(test)]
315    pub(in crate::db) fn is_empty(&self) -> bool {
316        match &self.backend {
317            DataStoreBackend::Heap(map) => map.is_empty(),
318            DataStoreBackend::Journaled { .. } => {
319                let mut empty = true;
320                let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
321                    empty = false;
322                    Ok(StoreVisit::Stop)
323                });
324                empty
325            }
326        }
327    }
328
329    /// Visit raw row entries in canonical storage order.
330    pub(in crate::db) fn visit_entries<E>(
331        &self,
332        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
333    ) -> Result<(), E> {
334        match &self.backend {
335            DataStoreBackend::Heap(map) => {
336                for (key, row) in map {
337                    if visitor(key, row)?.should_stop() {
338                        break;
339                    }
340                }
341            }
342            DataStoreBackend::Journaled {
343                canonical: _,
344                live: _,
345                tombstones: _,
346            } => Self::visit_journaled_entries_in_bounds(
347                &self.backend,
348                (Bound::Unbounded, Bound::Unbounded),
349                false,
350                visitor,
351            )?,
352        }
353
354        Ok(())
355    }
356
357    /// Visit raw row entries in reverse canonical storage order.
358    pub(in crate::db) fn visit_entries_rev<E>(
359        &self,
360        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
361    ) -> Result<(), E> {
362        match &self.backend {
363            DataStoreBackend::Heap(map) => {
364                for (key, row) in map.iter().rev() {
365                    if visitor(key, row)?.should_stop() {
366                        break;
367                    }
368                }
369            }
370            DataStoreBackend::Journaled {
371                canonical: _,
372                live: _,
373                tombstones: _,
374            } => Self::visit_journaled_entries_in_bounds(
375                &self.backend,
376                (Bound::Unbounded, Bound::Unbounded),
377                true,
378                visitor,
379            )?,
380        }
381
382        Ok(())
383    }
384
385    /// Visit raw row entries whose keys belong to the provided storage range.
386    pub(in crate::db) fn visit_range<E>(
387        &self,
388        key_range: impl RangeBounds<RawDataStoreKey>,
389        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
390    ) -> Result<(), E> {
391        let bounds = Self::owned_range_bounds(&key_range);
392        match &self.backend {
393            DataStoreBackend::Heap(map) => {
394                for (key, row) in map.range((bounds.0.clone(), bounds.1)) {
395                    if visitor(key, row)?.should_stop() {
396                        break;
397                    }
398                }
399            }
400            DataStoreBackend::Journaled {
401                canonical: _,
402                live: _,
403                tombstones: _,
404            } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, false, visitor)?,
405        }
406
407        Ok(())
408    }
409
410    /// Visit raw row entries in reverse order whose keys belong to the provided storage range.
411    pub(in crate::db) fn visit_range_rev<E>(
412        &self,
413        key_range: impl RangeBounds<RawDataStoreKey>,
414        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
415    ) -> Result<(), E> {
416        let bounds = Self::owned_range_bounds(&key_range);
417        match &self.backend {
418            DataStoreBackend::Heap(map) => {
419                for (key, row) in map.range((bounds.0.clone(), bounds.1)).rev() {
420                    if visitor(key, row)?.should_stop() {
421                        break;
422                    }
423                }
424            }
425            DataStoreBackend::Journaled {
426                canonical: _,
427                live: _,
428                tombstones: _,
429            } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, true, visitor)?,
430        }
431
432        Ok(())
433    }
434
435    /// Visit raw row entries for one entity using compact prefix bounds.
436    pub(in crate::db) fn visit_entity<E>(
437        &self,
438        entity: EntityTag,
439        visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
440    ) -> Result<(), E> {
441        let range = RawDataStoreKeyRange::entity_prefix(entity);
442        self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
443    }
444
445    /// Sum of bytes used by all stored rows.
446    pub(in crate::db) fn memory_bytes(&self) -> u64 {
447        // Report map footprint as key bytes + row bytes per entry.
448        let mut bytes = 0u64;
449        let _: Result<(), Infallible> = self.visit_entries(|key, row| {
450            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
451            Ok(StoreVisit::Continue)
452        });
453        bytes
454    }
455
456    /// Return the monotonic perf-only count of stable row fetches seen by this process.
457    #[cfg(feature = "diagnostics")]
458    pub(in crate::db) fn current_get_call_count() -> u64 {
459        DATA_STORE_GET_CALL_COUNT.with(Cell::get)
460    }
461
462    #[cfg(test)]
463    #[must_use]
464    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
465        match &self.backend {
466            DataStoreBackend::Journaled { canonical: map, .. } => map.len(),
467            DataStoreBackend::Heap(_) => 0,
468        }
469    }
470
471    fn journaled_get_raw(backend: &DataStoreBackend, key: &RawDataStoreKey) -> Option<RawRow> {
472        let DataStoreBackend::Journaled {
473            canonical,
474            live,
475            tombstones,
476        } = backend
477        else {
478            return None;
479        };
480
481        if tombstones.contains(key) {
482            return None;
483        }
484        live.get(key).cloned().or_else(|| canonical.get(key))
485    }
486
487    fn owned_range_bounds(
488        key_range: &impl RangeBounds<RawDataStoreKey>,
489    ) -> (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>) {
490        let lower = match key_range.start_bound() {
491            Bound::Included(key) => Bound::Included(key.clone()),
492            Bound::Excluded(key) => Bound::Excluded(key.clone()),
493            Bound::Unbounded => Bound::Unbounded,
494        };
495        let upper = match key_range.end_bound() {
496            Bound::Included(key) => Bound::Included(key.clone()),
497            Bound::Excluded(key) => Bound::Excluded(key.clone()),
498            Bound::Unbounded => Bound::Unbounded,
499        };
500
501        (lower, upper)
502    }
503
504    fn visit_journaled_entries_in_bounds<E>(
505        backend: &DataStoreBackend,
506        bounds: (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>),
507        reverse: bool,
508        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
509    ) -> Result<(), E> {
510        let DataStoreBackend::Journaled {
511            canonical,
512            live,
513            tombstones,
514        } = backend
515        else {
516            return Ok(());
517        };
518
519        if canonical.is_empty() {
520            if reverse {
521                for (key, row) in live.range(bounds).rev() {
522                    if visitor(key, row)?.should_stop() {
523                        return Ok(());
524                    }
525                }
526            } else {
527                for (key, row) in live.range(bounds) {
528                    if visitor(key, row)?.should_stop() {
529                        return Ok(());
530                    }
531                }
532            }
533            return Ok(());
534        }
535
536        if live.is_empty() && tombstones.is_empty() {
537            if reverse {
538                for entry in canonical.range(bounds).rev() {
539                    if visitor(entry.key(), &entry.value())?.should_stop() {
540                        return Ok(());
541                    }
542                }
543            } else {
544                for entry in canonical.range(bounds) {
545                    if visitor(entry.key(), &entry.value())?.should_stop() {
546                        return Ok(());
547                    }
548                }
549            }
550            return Ok(());
551        }
552
553        match if reverse {
554            Direction::Desc
555        } else {
556            Direction::Asc
557        } {
558            Direction::Asc => visit_ordered_overlay(
559                canonical.range((bounds.0.clone(), bounds.1.clone())),
560                live.range((bounds.0, bounds.1)),
561                Direction::Asc,
562                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
563                |canonical_entry| !tombstones.contains(canonical_entry.key()),
564                |live_entry| !tombstones.contains(live_entry.0),
565                |entry| {
566                    let visit = match entry {
567                        OrderedOverlayEntry::Canonical(canonical_entry) => {
568                            visitor(canonical_entry.key(), &canonical_entry.value())?
569                        }
570                        OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
571                    };
572                    Ok(if visit.should_stop() {
573                        OrderedOverlayVisit::Stop
574                    } else {
575                        OrderedOverlayVisit::Continue
576                    })
577                },
578            ),
579            Direction::Desc => visit_ordered_overlay(
580                canonical.range((bounds.0.clone(), bounds.1.clone())).rev(),
581                live.range((bounds.0, bounds.1)).rev(),
582                Direction::Desc,
583                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
584                |canonical_entry| !tombstones.contains(canonical_entry.key()),
585                |live_entry| !tombstones.contains(live_entry.0),
586                |entry| {
587                    let visit = match entry {
588                        OrderedOverlayEntry::Canonical(canonical_entry) => {
589                            visitor(canonical_entry.key(), &canonical_entry.value())?
590                        }
591                        OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
592                    };
593                    Ok(if visit.should_stop() {
594                        OrderedOverlayVisit::Stop
595                    } else {
596                        OrderedOverlayVisit::Continue
597                    })
598                },
599            ),
600        }
601    }
602}
603
604#[cfg(test)]
605mod tests;