Skip to main content

icydb_core/db/data/
store.rs

1//! Module: data::store
2//! Responsibility: stable-or-heap row storage behind the data-store boundary.
3//! Does not own: key/row validation policy beyond type boundaries.
4//! Boundary: commit/executor call into this layer after prevalidation.
5
6use crate::{
7    db::{
8        data::{CanonicalRow, RawDataStoreKey, RawRow},
9        direction::Direction,
10        key_taxonomy::RawDataStoreKeyRange,
11        ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
12    },
13    types::EntityTag,
14};
15use ic_memory::stable_structures::{
16    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
17};
18#[cfg(feature = "diagnostics")]
19use std::cell::Cell;
20use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
21use std::convert::Infallible;
22use std::ops::{Bound, RangeBounds};
23
24#[cfg(feature = "diagnostics")]
25thread_local! {
26    static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
27}
28
29#[cfg(feature = "diagnostics")]
30fn record_data_store_get_call() {
31    DATA_STORE_GET_CALL_COUNT.with(|count| {
32        count.set(count.get().saturating_add(1));
33    });
34}
35
36///
37/// DataStore
38///
39/// Thin persistence wrapper over one stable or heap BTreeMap.
40///
41/// Invariant: callers provide already-validated `RawDataStoreKey` and canonical row bytes.
42/// This type intentionally does not enforce commit-phase ordering.
43///
44
45pub struct DataStore {
46    backend: DataStoreBackend,
47}
48
49enum DataStoreBackend {
50    Stable(StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>),
51    Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
52    Journaled {
53        canonical: StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>,
54        live: HeapBTreeMap<RawDataStoreKey, RawRow>,
55        tombstones: BTreeSet<RawDataStoreKey>,
56    },
57}
58
59/// Control-flow result for store traversal visitors.
60#[derive(Clone, Copy, Debug, Eq, PartialEq)]
61pub(in crate::db) enum StoreVisit {
62    Continue,
63    Stop,
64}
65
66impl StoreVisit {
67    const fn should_stop(self) -> bool {
68        matches!(self, Self::Stop)
69    }
70}
71
72impl DataStore {
73    /// Initialize a data store with the provided backing memory.
74    #[must_use]
75    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
76        Self {
77            backend: DataStoreBackend::Stable(StableBTreeMap::init(memory)),
78        }
79    }
80
81    /// Initialize a volatile heap-backed data store.
82    #[must_use]
83    pub const fn init_heap() -> Self {
84        Self {
85            backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
86        }
87    }
88
89    /// Initialize a journaled cached-stable data store.
90    ///
91    /// Normal writes update only the live projection. The canonical stable map
92    /// is the future fold target and is not mutated by this wrapper's write
93    /// methods.
94    #[must_use]
95    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
96        Self {
97            backend: DataStoreBackend::Journaled {
98                canonical: StableBTreeMap::init(memory),
99                live: HeapBTreeMap::new(),
100                tombstones: BTreeSet::new(),
101            },
102        }
103    }
104
105    /// Insert or replace one row by raw key.
106    pub(in crate::db) fn insert(
107        &mut self,
108        key: RawDataStoreKey,
109        row: CanonicalRow,
110    ) -> Option<RawRow> {
111        let row = row.into_raw_row();
112        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
113            self.get(&key)
114        } else {
115            None
116        };
117        match &mut self.backend {
118            DataStoreBackend::Stable(map) => map.insert(key, row),
119            DataStoreBackend::Heap(map) => map.insert(key, row),
120            DataStoreBackend::Journaled {
121                live, tombstones, ..
122            } => {
123                tombstones.remove(&key);
124                live.insert(key, row);
125                previous_journaled
126            }
127        }
128    }
129
130    /// Insert one raw row directly for corruption-focused test setup only.
131    #[cfg(test)]
132    pub(in crate::db) fn insert_raw_for_test(
133        &mut self,
134        key: RawDataStoreKey,
135        row: RawRow,
136    ) -> Option<RawRow> {
137        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
138            self.get(&key)
139        } else {
140            None
141        };
142        match &mut self.backend {
143            DataStoreBackend::Stable(map) => map.insert(key, row),
144            DataStoreBackend::Heap(map) => map.insert(key, row),
145            DataStoreBackend::Journaled {
146                live, tombstones, ..
147            } => {
148                tombstones.remove(&key);
149                live.insert(key, row);
150                previous_journaled
151            }
152        }
153    }
154
155    /// Remove one row by raw key.
156    pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
157        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
158            self.get(key)
159        } else {
160            None
161        };
162        match &mut self.backend {
163            DataStoreBackend::Stable(map) => map.remove(key),
164            DataStoreBackend::Heap(map) => map.remove(key),
165            DataStoreBackend::Journaled {
166                live, tombstones, ..
167            } => {
168                live.remove(key);
169                tombstones.insert(key.clone());
170                previous_journaled
171            }
172        }
173    }
174
175    /// Reset the volatile projection for journaled recovery without mutating
176    /// the canonical stable base.
177    pub(in crate::db) fn reset_journaled_live_projection(
178        &mut self,
179    ) -> Result<(), crate::error::InternalError> {
180        let DataStoreBackend::Journaled {
181            live, tombstones, ..
182        } = &mut self.backend
183        else {
184            return Err(crate::error::InternalError::store_invariant());
185        };
186
187        live.clear();
188        tombstones.clear();
189
190        Ok(())
191    }
192
193    /// Apply one recovered journal row put into the volatile projection.
194    pub(in crate::db) fn apply_recovered_journal_put(
195        &mut self,
196        key: RawDataStoreKey,
197        row: RawRow,
198    ) -> Result<Option<RawRow>, crate::error::InternalError> {
199        let DataStoreBackend::Journaled {
200            canonical,
201            live,
202            tombstones,
203        } = &mut self.backend
204        else {
205            return Err(crate::error::InternalError::store_invariant());
206        };
207
208        let previous = if tombstones.contains(&key) {
209            None
210        } else {
211            live.get(&key).cloned().or_else(|| canonical.get(&key))
212        };
213        tombstones.remove(&key);
214        live.insert(key, row);
215
216        Ok(previous)
217    }
218
219    /// Apply one recovered journal row delete into the volatile projection.
220    pub(in crate::db) fn apply_recovered_journal_delete(
221        &mut self,
222        key: &RawDataStoreKey,
223    ) -> Result<Option<RawRow>, crate::error::InternalError> {
224        let DataStoreBackend::Journaled {
225            canonical,
226            live,
227            tombstones,
228        } = &mut self.backend
229        else {
230            return Err(crate::error::InternalError::store_invariant());
231        };
232
233        let previous = if tombstones.contains(key) {
234            None
235        } else {
236            live.get(key).cloned().or_else(|| canonical.get(key))
237        };
238        live.remove(key);
239        tombstones.insert(key.clone());
240
241        Ok(previous)
242    }
243
244    /// Apply one folded journal row put into the canonical stable base.
245    pub(in crate::db) fn fold_recovered_journal_put(
246        &mut self,
247        key: RawDataStoreKey,
248        row: RawRow,
249    ) -> Result<Option<RawRow>, crate::error::InternalError> {
250        let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
251            return Err(crate::error::InternalError::store_invariant());
252        };
253
254        Ok(canonical.insert(key, row))
255    }
256
257    /// Apply one folded journal row delete into the canonical stable base.
258    pub(in crate::db) fn fold_recovered_journal_delete(
259        &mut self,
260        key: &RawDataStoreKey,
261    ) -> Result<Option<RawRow>, crate::error::InternalError> {
262        let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
263            return Err(crate::error::InternalError::store_invariant());
264        };
265
266        Ok(canonical.remove(key))
267    }
268
269    /// Load one row by raw key.
270    pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
271        #[cfg(feature = "diagnostics")]
272        record_data_store_get_call();
273
274        match &self.backend {
275            DataStoreBackend::Stable(map) => map.get(key),
276            DataStoreBackend::Heap(map) => map.get(key).cloned(),
277            DataStoreBackend::Journaled { .. } => Self::journaled_get_raw(&self.backend, key),
278        }
279    }
280
281    /// Return whether one raw key exists without cloning the row payload.
282    #[must_use]
283    pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
284        match &self.backend {
285            DataStoreBackend::Stable(map) => map.contains_key(key),
286            DataStoreBackend::Heap(map) => map.contains_key(key),
287            DataStoreBackend::Journaled { .. } => {
288                Self::journaled_get_raw(&self.backend, key).is_some()
289            }
290        }
291    }
292
293    /// Clear all stored rows from the data store.
294    #[cfg(test)]
295    pub(in crate::db) fn clear(&mut self) {
296        match &mut self.backend {
297            DataStoreBackend::Stable(map) => map.clear_new(),
298            DataStoreBackend::Heap(map) => map.clear(),
299            DataStoreBackend::Journaled {
300                canonical,
301                live,
302                tombstones,
303            } => {
304                live.clear();
305                tombstones.clear();
306                for entry in canonical.iter() {
307                    tombstones.insert(entry.key().clone());
308                }
309            }
310        }
311    }
312
313    /// Return the number of stored rows without exposing the backing map.
314    #[must_use]
315    pub(in crate::db) fn len(&self) -> u64 {
316        match &self.backend {
317            DataStoreBackend::Stable(map) => map.len(),
318            DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
319            DataStoreBackend::Journaled { .. } => {
320                let mut count = 0_u64;
321                let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
322                    count = count.saturating_add(1);
323                    Ok(StoreVisit::Continue)
324                });
325                count
326            }
327        }
328    }
329
330    /// Return whether the data store currently contains no rows.
331    #[must_use]
332    #[cfg(test)]
333    pub(in crate::db) fn is_empty(&self) -> bool {
334        match &self.backend {
335            DataStoreBackend::Stable(map) => map.is_empty(),
336            DataStoreBackend::Heap(map) => map.is_empty(),
337            DataStoreBackend::Journaled { .. } => {
338                let mut empty = true;
339                let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
340                    empty = false;
341                    Ok(StoreVisit::Stop)
342                });
343                empty
344            }
345        }
346    }
347
348    /// Visit raw row entries in canonical storage order.
349    pub(in crate::db) fn visit_entries<E>(
350        &self,
351        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
352    ) -> Result<(), E> {
353        match &self.backend {
354            DataStoreBackend::Stable(map) => {
355                for entry in map.iter() {
356                    if visitor(entry.key(), &entry.value())?.should_stop() {
357                        break;
358                    }
359                }
360            }
361            DataStoreBackend::Heap(map) => {
362                for (key, row) in map {
363                    if visitor(key, row)?.should_stop() {
364                        break;
365                    }
366                }
367            }
368            DataStoreBackend::Journaled {
369                canonical: _,
370                live: _,
371                tombstones: _,
372            } => Self::visit_journaled_entries_in_bounds(
373                &self.backend,
374                (Bound::Unbounded, Bound::Unbounded),
375                false,
376                visitor,
377            )?,
378        }
379
380        Ok(())
381    }
382
383    /// Visit raw row entries in reverse canonical storage order.
384    pub(in crate::db) fn visit_entries_rev<E>(
385        &self,
386        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
387    ) -> Result<(), E> {
388        match &self.backend {
389            DataStoreBackend::Stable(map) => {
390                for entry in map.iter().rev() {
391                    if visitor(entry.key(), &entry.value())?.should_stop() {
392                        break;
393                    }
394                }
395            }
396            DataStoreBackend::Heap(map) => {
397                for (key, row) in map.iter().rev() {
398                    if visitor(key, row)?.should_stop() {
399                        break;
400                    }
401                }
402            }
403            DataStoreBackend::Journaled {
404                canonical: _,
405                live: _,
406                tombstones: _,
407            } => Self::visit_journaled_entries_in_bounds(
408                &self.backend,
409                (Bound::Unbounded, Bound::Unbounded),
410                true,
411                visitor,
412            )?,
413        }
414
415        Ok(())
416    }
417
418    /// Visit raw row entries whose keys belong to the provided storage range.
419    pub(in crate::db) fn visit_range<E>(
420        &self,
421        key_range: impl RangeBounds<RawDataStoreKey>,
422        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
423    ) -> Result<(), E> {
424        let bounds = Self::owned_range_bounds(&key_range);
425        match &self.backend {
426            DataStoreBackend::Stable(map) => {
427                for entry in map.range((bounds.0.clone(), bounds.1)) {
428                    if visitor(entry.key(), &entry.value())?.should_stop() {
429                        break;
430                    }
431                }
432            }
433            DataStoreBackend::Heap(map) => {
434                for (key, row) in map.range((bounds.0.clone(), bounds.1)) {
435                    if visitor(key, row)?.should_stop() {
436                        break;
437                    }
438                }
439            }
440            DataStoreBackend::Journaled {
441                canonical: _,
442                live: _,
443                tombstones: _,
444            } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, false, visitor)?,
445        }
446
447        Ok(())
448    }
449
450    /// Visit raw row entries in reverse order whose keys belong to the provided storage range.
451    pub(in crate::db) fn visit_range_rev<E>(
452        &self,
453        key_range: impl RangeBounds<RawDataStoreKey>,
454        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
455    ) -> Result<(), E> {
456        let bounds = Self::owned_range_bounds(&key_range);
457        match &self.backend {
458            DataStoreBackend::Stable(map) => {
459                for entry in map.range((bounds.0.clone(), bounds.1)).rev() {
460                    if visitor(entry.key(), &entry.value())?.should_stop() {
461                        break;
462                    }
463                }
464            }
465            DataStoreBackend::Heap(map) => {
466                for (key, row) in map.range((bounds.0.clone(), bounds.1)).rev() {
467                    if visitor(key, row)?.should_stop() {
468                        break;
469                    }
470                }
471            }
472            DataStoreBackend::Journaled {
473                canonical: _,
474                live: _,
475                tombstones: _,
476            } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, true, visitor)?,
477        }
478
479        Ok(())
480    }
481
482    /// Visit raw row entries for one entity using compact prefix bounds.
483    pub(in crate::db) fn visit_entity<E>(
484        &self,
485        entity: EntityTag,
486        visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
487    ) -> Result<(), E> {
488        let range = RawDataStoreKeyRange::entity_prefix(entity);
489        self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
490    }
491
492    /// Sum of bytes used by all stored rows.
493    pub(in crate::db) fn memory_bytes(&self) -> u64 {
494        // Report map footprint as key bytes + row bytes per entry.
495        let mut bytes = 0u64;
496        let _: Result<(), Infallible> = self.visit_entries(|key, row| {
497            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
498            Ok(StoreVisit::Continue)
499        });
500        bytes
501    }
502
503    /// Return the monotonic perf-only count of stable row fetches seen by this process.
504    #[cfg(feature = "diagnostics")]
505    pub(in crate::db) fn current_get_call_count() -> u64 {
506        DATA_STORE_GET_CALL_COUNT.with(Cell::get)
507    }
508
509    #[cfg(test)]
510    #[must_use]
511    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
512        match &self.backend {
513            DataStoreBackend::Stable(map) | DataStoreBackend::Journaled { canonical: map, .. } => {
514                map.len()
515            }
516            DataStoreBackend::Heap(_) => 0,
517        }
518    }
519
520    fn journaled_get_raw(backend: &DataStoreBackend, key: &RawDataStoreKey) -> Option<RawRow> {
521        let DataStoreBackend::Journaled {
522            canonical,
523            live,
524            tombstones,
525        } = backend
526        else {
527            return None;
528        };
529
530        if tombstones.contains(key) {
531            return None;
532        }
533        live.get(key).cloned().or_else(|| canonical.get(key))
534    }
535
536    fn owned_range_bounds(
537        key_range: &impl RangeBounds<RawDataStoreKey>,
538    ) -> (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>) {
539        let lower = match key_range.start_bound() {
540            Bound::Included(key) => Bound::Included(key.clone()),
541            Bound::Excluded(key) => Bound::Excluded(key.clone()),
542            Bound::Unbounded => Bound::Unbounded,
543        };
544        let upper = match key_range.end_bound() {
545            Bound::Included(key) => Bound::Included(key.clone()),
546            Bound::Excluded(key) => Bound::Excluded(key.clone()),
547            Bound::Unbounded => Bound::Unbounded,
548        };
549
550        (lower, upper)
551    }
552
553    fn visit_journaled_entries_in_bounds<E>(
554        backend: &DataStoreBackend,
555        bounds: (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>),
556        reverse: bool,
557        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
558    ) -> Result<(), E> {
559        let DataStoreBackend::Journaled {
560            canonical,
561            live,
562            tombstones,
563        } = backend
564        else {
565            return Ok(());
566        };
567
568        if canonical.is_empty() {
569            if reverse {
570                for (key, row) in live.range(bounds).rev() {
571                    if visitor(key, row)?.should_stop() {
572                        return Ok(());
573                    }
574                }
575            } else {
576                for (key, row) in live.range(bounds) {
577                    if visitor(key, row)?.should_stop() {
578                        return Ok(());
579                    }
580                }
581            }
582            return Ok(());
583        }
584
585        if live.is_empty() && tombstones.is_empty() {
586            if reverse {
587                for entry in canonical.range(bounds).rev() {
588                    if visitor(entry.key(), &entry.value())?.should_stop() {
589                        return Ok(());
590                    }
591                }
592            } else {
593                for entry in canonical.range(bounds) {
594                    if visitor(entry.key(), &entry.value())?.should_stop() {
595                        return Ok(());
596                    }
597                }
598            }
599            return Ok(());
600        }
601
602        match if reverse {
603            Direction::Desc
604        } else {
605            Direction::Asc
606        } {
607            Direction::Asc => visit_ordered_overlay(
608                canonical.range((bounds.0.clone(), bounds.1.clone())),
609                live.range((bounds.0, bounds.1)),
610                Direction::Asc,
611                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
612                |canonical_entry| !tombstones.contains(canonical_entry.key()),
613                |live_entry| !tombstones.contains(live_entry.0),
614                |entry| {
615                    let visit = match entry {
616                        OrderedOverlayEntry::Canonical(canonical_entry) => {
617                            visitor(canonical_entry.key(), &canonical_entry.value())?
618                        }
619                        OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
620                    };
621                    Ok(if visit.should_stop() {
622                        OrderedOverlayVisit::Stop
623                    } else {
624                        OrderedOverlayVisit::Continue
625                    })
626                },
627            ),
628            Direction::Desc => visit_ordered_overlay(
629                canonical.range((bounds.0.clone(), bounds.1.clone())).rev(),
630                live.range((bounds.0, bounds.1)).rev(),
631                Direction::Desc,
632                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
633                |canonical_entry| !tombstones.contains(canonical_entry.key()),
634                |live_entry| !tombstones.contains(live_entry.0),
635                |entry| {
636                    let visit = match entry {
637                        OrderedOverlayEntry::Canonical(canonical_entry) => {
638                            visitor(canonical_entry.key(), &canonical_entry.value())?
639                        }
640                        OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
641                    };
642                    Ok(if visit.should_stop() {
643                        OrderedOverlayVisit::Stop
644                    } else {
645                        OrderedOverlayVisit::Continue
646                    })
647                },
648            ),
649        }
650    }
651}
652
653#[cfg(test)]
654mod tests;