Skip to main content

icydb_core/db/data/
store.rs

1//! Module: data::store
2//! Responsibility: stable-or-heap row storage behind the data-store boundary.
3//! Does not own: key/row validation policy beyond type boundaries.
4//! Boundary: commit/executor call into this layer after prevalidation.
5
6use crate::{
7    db::{
8        data::{CanonicalRow, RawDataStoreKey, RawRow},
9        direction::Direction,
10        key_taxonomy::RawDataStoreKeyRange,
11        ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
12    },
13    types::EntityTag,
14};
15use ic_memory::stable_structures::{
16    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
17};
18#[cfg(feature = "diagnostics")]
19use std::cell::Cell;
20use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
21use std::convert::Infallible;
22use std::ops::{Bound, RangeBounds};
23
24#[cfg(feature = "diagnostics")]
25thread_local! {
26    static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
27}
28
29#[cfg(feature = "diagnostics")]
30fn record_data_store_get_call() {
31    DATA_STORE_GET_CALL_COUNT.with(|count| {
32        count.set(count.get().saturating_add(1));
33    });
34}
35
36///
37/// DataStore
38///
39/// Thin persistence wrapper over one stable or heap BTreeMap.
40///
41/// Invariant: callers provide already-validated `RawDataStoreKey` and canonical row bytes.
42/// This type intentionally does not enforce commit-phase ordering.
43///
44
45pub struct DataStore {
46    backend: DataStoreBackend,
47}
48
49enum DataStoreBackend {
50    Stable(StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>),
51    Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
52    Journaled {
53        canonical: StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>,
54        live: HeapBTreeMap<RawDataStoreKey, RawRow>,
55        tombstones: BTreeSet<RawDataStoreKey>,
56    },
57}
58
59/// Control-flow result for store traversal visitors.
60#[derive(Clone, Copy, Debug, Eq, PartialEq)]
61pub(in crate::db) enum StoreVisit {
62    Continue,
63    Stop,
64}
65
66impl StoreVisit {
67    const fn should_stop(self) -> bool {
68        matches!(self, Self::Stop)
69    }
70}
71
72impl DataStore {
73    /// Initialize a data store with the provided backing memory.
74    #[must_use]
75    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
76        Self {
77            backend: DataStoreBackend::Stable(StableBTreeMap::init(memory)),
78        }
79    }
80
81    /// Initialize a volatile heap-backed data store.
82    #[must_use]
83    pub const fn init_heap() -> Self {
84        Self {
85            backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
86        }
87    }
88
89    /// Initialize a journaled cached-stable data store.
90    ///
91    /// Normal writes update only the live projection. The canonical stable map
92    /// is the future fold target and is not mutated by this wrapper's write
93    /// methods.
94    #[must_use]
95    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
96        Self {
97            backend: DataStoreBackend::Journaled {
98                canonical: StableBTreeMap::init(memory),
99                live: HeapBTreeMap::new(),
100                tombstones: BTreeSet::new(),
101            },
102        }
103    }
104
105    /// Insert or replace one row by raw key.
106    pub(in crate::db) fn insert(
107        &mut self,
108        key: RawDataStoreKey,
109        row: CanonicalRow,
110    ) -> Option<RawRow> {
111        let row = row.into_raw_row();
112        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
113            self.get(&key)
114        } else {
115            None
116        };
117        match &mut self.backend {
118            DataStoreBackend::Stable(map) => map.insert(key, row),
119            DataStoreBackend::Heap(map) => map.insert(key, row),
120            DataStoreBackend::Journaled {
121                live, tombstones, ..
122            } => {
123                tombstones.remove(&key);
124                live.insert(key, row);
125                previous_journaled
126            }
127        }
128    }
129
130    /// Insert one raw row directly for corruption-focused test setup only.
131    #[cfg(test)]
132    pub(in crate::db) fn insert_raw_for_test(
133        &mut self,
134        key: RawDataStoreKey,
135        row: RawRow,
136    ) -> Option<RawRow> {
137        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
138            self.get(&key)
139        } else {
140            None
141        };
142        match &mut self.backend {
143            DataStoreBackend::Stable(map) => map.insert(key, row),
144            DataStoreBackend::Heap(map) => map.insert(key, row),
145            DataStoreBackend::Journaled {
146                live, tombstones, ..
147            } => {
148                tombstones.remove(&key);
149                live.insert(key, row);
150                previous_journaled
151            }
152        }
153    }
154
155    /// Remove one row by raw key.
156    pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
157        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
158            self.get(key)
159        } else {
160            None
161        };
162        match &mut self.backend {
163            DataStoreBackend::Stable(map) => map.remove(key),
164            DataStoreBackend::Heap(map) => map.remove(key),
165            DataStoreBackend::Journaled {
166                live, tombstones, ..
167            } => {
168                live.remove(key);
169                tombstones.insert(key.clone());
170                previous_journaled
171            }
172        }
173    }
174
175    /// Reset the volatile projection for journaled recovery without mutating
176    /// the canonical stable base.
177    pub(in crate::db) fn reset_journaled_live_projection(
178        &mut self,
179    ) -> Result<(), crate::error::InternalError> {
180        let DataStoreBackend::Journaled {
181            live, tombstones, ..
182        } = &mut self.backend
183        else {
184            return Err(crate::error::InternalError::store_invariant(
185                "journaled live projection reset requires a journaled data store",
186            ));
187        };
188
189        live.clear();
190        tombstones.clear();
191
192        Ok(())
193    }
194
195    /// Apply one recovered journal row put into the volatile projection.
196    pub(in crate::db) fn apply_recovered_journal_put(
197        &mut self,
198        key: RawDataStoreKey,
199        row: RawRow,
200    ) -> Result<Option<RawRow>, crate::error::InternalError> {
201        let DataStoreBackend::Journaled {
202            canonical,
203            live,
204            tombstones,
205        } = &mut self.backend
206        else {
207            return Err(crate::error::InternalError::store_invariant(
208                "journal row replay requires a journaled data store",
209            ));
210        };
211
212        let previous = if tombstones.contains(&key) {
213            None
214        } else {
215            live.get(&key).cloned().or_else(|| canonical.get(&key))
216        };
217        tombstones.remove(&key);
218        live.insert(key, row);
219
220        Ok(previous)
221    }
222
223    /// Apply one recovered journal row delete into the volatile projection.
224    pub(in crate::db) fn apply_recovered_journal_delete(
225        &mut self,
226        key: &RawDataStoreKey,
227    ) -> Result<Option<RawRow>, crate::error::InternalError> {
228        let DataStoreBackend::Journaled {
229            canonical,
230            live,
231            tombstones,
232        } = &mut self.backend
233        else {
234            return Err(crate::error::InternalError::store_invariant(
235                "journal row replay requires a journaled data store",
236            ));
237        };
238
239        let previous = if tombstones.contains(key) {
240            None
241        } else {
242            live.get(key).cloned().or_else(|| canonical.get(key))
243        };
244        live.remove(key);
245        tombstones.insert(key.clone());
246
247        Ok(previous)
248    }
249
250    /// Apply one folded journal row put into the canonical stable base.
251    pub(in crate::db) fn fold_recovered_journal_put(
252        &mut self,
253        key: RawDataStoreKey,
254        row: RawRow,
255    ) -> Result<Option<RawRow>, crate::error::InternalError> {
256        let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
257            return Err(crate::error::InternalError::store_invariant(
258                "journal row fold requires a journaled data store",
259            ));
260        };
261
262        Ok(canonical.insert(key, row))
263    }
264
265    /// Apply one folded journal row delete into the canonical stable base.
266    pub(in crate::db) fn fold_recovered_journal_delete(
267        &mut self,
268        key: &RawDataStoreKey,
269    ) -> Result<Option<RawRow>, crate::error::InternalError> {
270        let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
271            return Err(crate::error::InternalError::store_invariant(
272                "journal row fold requires a journaled data store",
273            ));
274        };
275
276        Ok(canonical.remove(key))
277    }
278
279    /// Load one row by raw key.
280    pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
281        #[cfg(feature = "diagnostics")]
282        record_data_store_get_call();
283
284        match &self.backend {
285            DataStoreBackend::Stable(map) => map.get(key),
286            DataStoreBackend::Heap(map) => map.get(key).cloned(),
287            DataStoreBackend::Journaled { .. } => Self::journaled_get_raw(&self.backend, key),
288        }
289    }
290
291    /// Return whether one raw key exists without cloning the row payload.
292    #[must_use]
293    pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
294        match &self.backend {
295            DataStoreBackend::Stable(map) => map.contains_key(key),
296            DataStoreBackend::Heap(map) => map.contains_key(key),
297            DataStoreBackend::Journaled { .. } => {
298                Self::journaled_get_raw(&self.backend, key).is_some()
299            }
300        }
301    }
302
303    /// Clear all stored rows from the data store.
304    #[cfg(test)]
305    pub(in crate::db) fn clear(&mut self) {
306        match &mut self.backend {
307            DataStoreBackend::Stable(map) => map.clear_new(),
308            DataStoreBackend::Heap(map) => map.clear(),
309            DataStoreBackend::Journaled {
310                canonical,
311                live,
312                tombstones,
313            } => {
314                live.clear();
315                tombstones.clear();
316                for entry in canonical.iter() {
317                    tombstones.insert(entry.key().clone());
318                }
319            }
320        }
321    }
322
323    /// Return the number of stored rows without exposing the backing map.
324    #[must_use]
325    pub(in crate::db) fn len(&self) -> u64 {
326        match &self.backend {
327            DataStoreBackend::Stable(map) => map.len(),
328            DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
329            DataStoreBackend::Journaled { .. } => {
330                let mut count = 0_u64;
331                let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
332                    count = count.saturating_add(1);
333                    Ok(StoreVisit::Continue)
334                });
335                count
336            }
337        }
338    }
339
340    /// Return whether the data store currently contains no rows.
341    #[must_use]
342    #[cfg(test)]
343    pub(in crate::db) fn is_empty(&self) -> bool {
344        match &self.backend {
345            DataStoreBackend::Stable(map) => map.is_empty(),
346            DataStoreBackend::Heap(map) => map.is_empty(),
347            DataStoreBackend::Journaled { .. } => {
348                let mut empty = true;
349                let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
350                    empty = false;
351                    Ok(StoreVisit::Stop)
352                });
353                empty
354            }
355        }
356    }
357
358    /// Visit raw row entries in canonical storage order.
359    pub(in crate::db) fn visit_entries<E>(
360        &self,
361        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
362    ) -> Result<(), E> {
363        match &self.backend {
364            DataStoreBackend::Stable(map) => {
365                for entry in map.iter() {
366                    if visitor(entry.key(), &entry.value())?.should_stop() {
367                        break;
368                    }
369                }
370            }
371            DataStoreBackend::Heap(map) => {
372                for (key, row) in map {
373                    if visitor(key, row)?.should_stop() {
374                        break;
375                    }
376                }
377            }
378            DataStoreBackend::Journaled {
379                canonical: _,
380                live: _,
381                tombstones: _,
382            } => Self::visit_journaled_entries_in_bounds(
383                &self.backend,
384                (Bound::Unbounded, Bound::Unbounded),
385                false,
386                visitor,
387            )?,
388        }
389
390        Ok(())
391    }
392
393    /// Visit raw row entries in reverse canonical storage order.
394    pub(in crate::db) fn visit_entries_rev<E>(
395        &self,
396        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
397    ) -> Result<(), E> {
398        match &self.backend {
399            DataStoreBackend::Stable(map) => {
400                for entry in map.iter().rev() {
401                    if visitor(entry.key(), &entry.value())?.should_stop() {
402                        break;
403                    }
404                }
405            }
406            DataStoreBackend::Heap(map) => {
407                for (key, row) in map.iter().rev() {
408                    if visitor(key, row)?.should_stop() {
409                        break;
410                    }
411                }
412            }
413            DataStoreBackend::Journaled {
414                canonical: _,
415                live: _,
416                tombstones: _,
417            } => Self::visit_journaled_entries_in_bounds(
418                &self.backend,
419                (Bound::Unbounded, Bound::Unbounded),
420                true,
421                visitor,
422            )?,
423        }
424
425        Ok(())
426    }
427
428    /// Visit raw row entries whose keys belong to the provided storage range.
429    pub(in crate::db) fn visit_range<E>(
430        &self,
431        key_range: impl RangeBounds<RawDataStoreKey>,
432        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
433    ) -> Result<(), E> {
434        let bounds = Self::owned_range_bounds(&key_range);
435        match &self.backend {
436            DataStoreBackend::Stable(map) => {
437                for entry in map.range((bounds.0.clone(), bounds.1)) {
438                    if visitor(entry.key(), &entry.value())?.should_stop() {
439                        break;
440                    }
441                }
442            }
443            DataStoreBackend::Heap(map) => {
444                for (key, row) in map.range((bounds.0.clone(), bounds.1)) {
445                    if visitor(key, row)?.should_stop() {
446                        break;
447                    }
448                }
449            }
450            DataStoreBackend::Journaled {
451                canonical: _,
452                live: _,
453                tombstones: _,
454            } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, false, visitor)?,
455        }
456
457        Ok(())
458    }
459
460    /// Visit raw row entries in reverse order whose keys belong to the provided storage range.
461    pub(in crate::db) fn visit_range_rev<E>(
462        &self,
463        key_range: impl RangeBounds<RawDataStoreKey>,
464        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
465    ) -> Result<(), E> {
466        let bounds = Self::owned_range_bounds(&key_range);
467        match &self.backend {
468            DataStoreBackend::Stable(map) => {
469                for entry in map.range((bounds.0.clone(), bounds.1)).rev() {
470                    if visitor(entry.key(), &entry.value())?.should_stop() {
471                        break;
472                    }
473                }
474            }
475            DataStoreBackend::Heap(map) => {
476                for (key, row) in map.range((bounds.0.clone(), bounds.1)).rev() {
477                    if visitor(key, row)?.should_stop() {
478                        break;
479                    }
480                }
481            }
482            DataStoreBackend::Journaled {
483                canonical: _,
484                live: _,
485                tombstones: _,
486            } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, true, visitor)?,
487        }
488
489        Ok(())
490    }
491
492    /// Visit raw row entries for one entity using compact prefix bounds.
493    pub(in crate::db) fn visit_entity<E>(
494        &self,
495        entity: EntityTag,
496        visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
497    ) -> Result<(), E> {
498        let range = RawDataStoreKeyRange::entity_prefix(entity);
499        self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
500    }
501
502    /// Sum of bytes used by all stored rows.
503    pub(in crate::db) fn memory_bytes(&self) -> u64 {
504        // Report map footprint as key bytes + row bytes per entry.
505        let mut bytes = 0u64;
506        let _: Result<(), Infallible> = self.visit_entries(|key, row| {
507            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
508            Ok(StoreVisit::Continue)
509        });
510        bytes
511    }
512
513    /// Return the monotonic perf-only count of stable row fetches seen by this process.
514    #[cfg(feature = "diagnostics")]
515    pub(in crate::db) fn current_get_call_count() -> u64 {
516        DATA_STORE_GET_CALL_COUNT.with(Cell::get)
517    }
518
519    #[cfg(test)]
520    #[must_use]
521    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
522        match &self.backend {
523            DataStoreBackend::Stable(map) | DataStoreBackend::Journaled { canonical: map, .. } => {
524                map.len()
525            }
526            DataStoreBackend::Heap(_) => 0,
527        }
528    }
529
530    fn journaled_get_raw(backend: &DataStoreBackend, key: &RawDataStoreKey) -> Option<RawRow> {
531        let DataStoreBackend::Journaled {
532            canonical,
533            live,
534            tombstones,
535        } = backend
536        else {
537            return None;
538        };
539
540        if tombstones.contains(key) {
541            return None;
542        }
543        live.get(key).cloned().or_else(|| canonical.get(key))
544    }
545
546    fn owned_range_bounds(
547        key_range: &impl RangeBounds<RawDataStoreKey>,
548    ) -> (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>) {
549        let lower = match key_range.start_bound() {
550            Bound::Included(key) => Bound::Included(key.clone()),
551            Bound::Excluded(key) => Bound::Excluded(key.clone()),
552            Bound::Unbounded => Bound::Unbounded,
553        };
554        let upper = match key_range.end_bound() {
555            Bound::Included(key) => Bound::Included(key.clone()),
556            Bound::Excluded(key) => Bound::Excluded(key.clone()),
557            Bound::Unbounded => Bound::Unbounded,
558        };
559
560        (lower, upper)
561    }
562
563    fn visit_journaled_entries_in_bounds<E>(
564        backend: &DataStoreBackend,
565        bounds: (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>),
566        reverse: bool,
567        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
568    ) -> Result<(), E> {
569        let DataStoreBackend::Journaled {
570            canonical,
571            live,
572            tombstones,
573        } = backend
574        else {
575            return Ok(());
576        };
577
578        if canonical.is_empty() {
579            if reverse {
580                for (key, row) in live.range(bounds).rev() {
581                    if visitor(key, row)?.should_stop() {
582                        return Ok(());
583                    }
584                }
585            } else {
586                for (key, row) in live.range(bounds) {
587                    if visitor(key, row)?.should_stop() {
588                        return Ok(());
589                    }
590                }
591            }
592            return Ok(());
593        }
594
595        if live.is_empty() && tombstones.is_empty() {
596            if reverse {
597                for entry in canonical.range(bounds).rev() {
598                    if visitor(entry.key(), &entry.value())?.should_stop() {
599                        return Ok(());
600                    }
601                }
602            } else {
603                for entry in canonical.range(bounds) {
604                    if visitor(entry.key(), &entry.value())?.should_stop() {
605                        return Ok(());
606                    }
607                }
608            }
609            return Ok(());
610        }
611
612        match if reverse {
613            Direction::Desc
614        } else {
615            Direction::Asc
616        } {
617            Direction::Asc => visit_ordered_overlay(
618                canonical.range((bounds.0.clone(), bounds.1.clone())),
619                live.range((bounds.0, bounds.1)),
620                Direction::Asc,
621                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
622                |canonical_entry| !tombstones.contains(canonical_entry.key()),
623                |live_entry| !tombstones.contains(live_entry.0),
624                |entry| {
625                    let visit = match entry {
626                        OrderedOverlayEntry::Canonical(canonical_entry) => {
627                            visitor(canonical_entry.key(), &canonical_entry.value())?
628                        }
629                        OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
630                    };
631                    Ok(if visit.should_stop() {
632                        OrderedOverlayVisit::Stop
633                    } else {
634                        OrderedOverlayVisit::Continue
635                    })
636                },
637            ),
638            Direction::Desc => visit_ordered_overlay(
639                canonical.range((bounds.0.clone(), bounds.1.clone())).rev(),
640                live.range((bounds.0, bounds.1)).rev(),
641                Direction::Desc,
642                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
643                |canonical_entry| !tombstones.contains(canonical_entry.key()),
644                |live_entry| !tombstones.contains(live_entry.0),
645                |entry| {
646                    let visit = match entry {
647                        OrderedOverlayEntry::Canonical(canonical_entry) => {
648                            visitor(canonical_entry.key(), &canonical_entry.value())?
649                        }
650                        OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
651                    };
652                    Ok(if visit.should_stop() {
653                        OrderedOverlayVisit::Stop
654                    } else {
655                        OrderedOverlayVisit::Continue
656                    })
657                },
658            ),
659        }
660    }
661}
662
663#[cfg(test)]
664mod tests;