Skip to main content

icydb_core/db/data/
store.rs

1//! Module: data::store
2//! Responsibility: stable BTreeMap-backed row persistence.
3//! Does not own: key/row validation policy beyond type boundaries.
4//! Boundary: commit/executor call into this layer after prevalidation.
5
6use crate::{
7    db::{
8        data::{CanonicalRow, RawDataStoreKey, RawRow},
9        key_taxonomy::RawDataStoreKeyRange,
10    },
11    types::EntityTag,
12};
13use ic_memory::stable_structures::{
14    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
15};
16#[cfg(feature = "diagnostics")]
17use std::cell::Cell;
18use std::collections::BTreeMap as HeapBTreeMap;
19use std::convert::Infallible;
20use std::ops::RangeBounds;
21
22#[cfg(feature = "diagnostics")]
23thread_local! {
24    static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
25}
26
27#[cfg(feature = "diagnostics")]
28fn record_data_store_get_call() {
29    DATA_STORE_GET_CALL_COUNT.with(|count| {
30        count.set(count.get().saturating_add(1));
31    });
32}
33
34///
35/// DataStore
36///
37/// Thin persistence wrapper over one stable or heap BTreeMap.
38///
39/// Invariant: callers provide already-validated `RawDataStoreKey` and canonical row bytes.
40/// This type intentionally does not enforce commit-phase ordering.
41///
42
43pub struct DataStore {
44    backend: DataStoreBackend,
45}
46
47enum DataStoreBackend {
48    Stable(StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>),
49    Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
50}
51
52/// Control-flow result for store traversal visitors.
53#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub(in crate::db) enum StoreVisit {
55    Continue,
56    Stop,
57}
58
59impl StoreVisit {
60    const fn should_stop(self) -> bool {
61        matches!(self, Self::Stop)
62    }
63}
64
65impl DataStore {
66    /// Initialize a data store with the provided backing memory.
67    #[must_use]
68    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
69        Self {
70            backend: DataStoreBackend::Stable(StableBTreeMap::init(memory)),
71        }
72    }
73
74    /// Initialize a volatile heap-backed data store.
75    #[must_use]
76    pub const fn init_heap() -> Self {
77        Self {
78            backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
79        }
80    }
81
82    /// Return whether this data store is heap-backed and volatile.
83    #[must_use]
84    pub(in crate::db) const fn is_heap_storage(&self) -> bool {
85        matches!(self.backend, DataStoreBackend::Heap(_))
86    }
87
88    /// Insert or replace one row by raw key.
89    pub(in crate::db) fn insert(
90        &mut self,
91        key: RawDataStoreKey,
92        row: CanonicalRow,
93    ) -> Option<RawRow> {
94        let row = row.into_raw_row();
95        match &mut self.backend {
96            DataStoreBackend::Stable(map) => map.insert(key, row),
97            DataStoreBackend::Heap(map) => map.insert(key, row),
98        }
99    }
100
101    /// Insert one raw row directly for corruption-focused test setup only.
102    #[cfg(test)]
103    pub(in crate::db) fn insert_raw_for_test(
104        &mut self,
105        key: RawDataStoreKey,
106        row: RawRow,
107    ) -> Option<RawRow> {
108        match &mut self.backend {
109            DataStoreBackend::Stable(map) => map.insert(key, row),
110            DataStoreBackend::Heap(map) => map.insert(key, row),
111        }
112    }
113
114    /// Remove one row by raw key.
115    pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
116        match &mut self.backend {
117            DataStoreBackend::Stable(map) => map.remove(key),
118            DataStoreBackend::Heap(map) => map.remove(key),
119        }
120    }
121
122    /// Load one row by raw key.
123    pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
124        #[cfg(feature = "diagnostics")]
125        record_data_store_get_call();
126
127        match &self.backend {
128            DataStoreBackend::Stable(map) => map.get(key),
129            DataStoreBackend::Heap(map) => map.get(key).cloned(),
130        }
131    }
132
133    /// Return whether one raw key exists without cloning the row payload.
134    #[must_use]
135    pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
136        match &self.backend {
137            DataStoreBackend::Stable(map) => map.contains_key(key),
138            DataStoreBackend::Heap(map) => map.contains_key(key),
139        }
140    }
141
142    /// Clear all stored rows from the data store.
143    #[cfg(test)]
144    pub(in crate::db) fn clear(&mut self) {
145        match &mut self.backend {
146            DataStoreBackend::Stable(map) => map.clear_new(),
147            DataStoreBackend::Heap(map) => map.clear(),
148        }
149    }
150
151    /// Return the number of stored rows without exposing the backing map.
152    #[must_use]
153    pub(in crate::db) fn len(&self) -> u64 {
154        match &self.backend {
155            DataStoreBackend::Stable(map) => map.len(),
156            DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
157        }
158    }
159
160    /// Return whether the data store currently contains no rows.
161    #[must_use]
162    #[cfg(test)]
163    pub(in crate::db) fn is_empty(&self) -> bool {
164        match &self.backend {
165            DataStoreBackend::Stable(map) => map.is_empty(),
166            DataStoreBackend::Heap(map) => map.is_empty(),
167        }
168    }
169
170    /// Visit raw row entries in canonical storage order.
171    pub(in crate::db) fn visit_entries<E>(
172        &self,
173        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
174    ) -> Result<(), E> {
175        match &self.backend {
176            DataStoreBackend::Stable(map) => {
177                for entry in map.iter() {
178                    if visitor(entry.key(), &entry.value())?.should_stop() {
179                        break;
180                    }
181                }
182            }
183            DataStoreBackend::Heap(map) => {
184                for (key, row) in map {
185                    if visitor(key, row)?.should_stop() {
186                        break;
187                    }
188                }
189            }
190        }
191
192        Ok(())
193    }
194
195    /// Visit raw row entries in reverse canonical storage order.
196    pub(in crate::db) fn visit_entries_rev<E>(
197        &self,
198        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
199    ) -> Result<(), E> {
200        match &self.backend {
201            DataStoreBackend::Stable(map) => {
202                for entry in map.iter().rev() {
203                    if visitor(entry.key(), &entry.value())?.should_stop() {
204                        break;
205                    }
206                }
207            }
208            DataStoreBackend::Heap(map) => {
209                for (key, row) in map.iter().rev() {
210                    if visitor(key, row)?.should_stop() {
211                        break;
212                    }
213                }
214            }
215        }
216
217        Ok(())
218    }
219
220    /// Visit raw row entries whose keys belong to the provided storage range.
221    pub(in crate::db) fn visit_range<E>(
222        &self,
223        key_range: impl RangeBounds<RawDataStoreKey>,
224        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
225    ) -> Result<(), E> {
226        match &self.backend {
227            DataStoreBackend::Stable(map) => {
228                for entry in map.range(key_range) {
229                    if visitor(entry.key(), &entry.value())?.should_stop() {
230                        break;
231                    }
232                }
233            }
234            DataStoreBackend::Heap(map) => {
235                for (key, row) in map.range(key_range) {
236                    if visitor(key, row)?.should_stop() {
237                        break;
238                    }
239                }
240            }
241        }
242
243        Ok(())
244    }
245
246    /// Visit raw row entries in reverse order whose keys belong to the provided storage range.
247    pub(in crate::db) fn visit_range_rev<E>(
248        &self,
249        key_range: impl RangeBounds<RawDataStoreKey>,
250        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
251    ) -> Result<(), E> {
252        match &self.backend {
253            DataStoreBackend::Stable(map) => {
254                for entry in map.range(key_range).rev() {
255                    if visitor(entry.key(), &entry.value())?.should_stop() {
256                        break;
257                    }
258                }
259            }
260            DataStoreBackend::Heap(map) => {
261                for (key, row) in map.range(key_range).rev() {
262                    if visitor(key, row)?.should_stop() {
263                        break;
264                    }
265                }
266            }
267        }
268
269        Ok(())
270    }
271
272    /// Visit raw row entries for one entity using compact prefix bounds.
273    pub(in crate::db) fn visit_entity<E>(
274        &self,
275        entity: EntityTag,
276        visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
277    ) -> Result<(), E> {
278        let range = RawDataStoreKeyRange::entity_prefix(entity);
279        self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
280    }
281
282    /// Sum of bytes used by all stored rows.
283    pub(in crate::db) fn memory_bytes(&self) -> u64 {
284        // Report map footprint as key bytes + row bytes per entry.
285        let mut bytes = 0u64;
286        let _: Result<(), Infallible> = self.visit_entries(|key, row| {
287            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
288            Ok(StoreVisit::Continue)
289        });
290        bytes
291    }
292
293    /// Return the monotonic perf-only count of stable row fetches seen by this process.
294    #[cfg(feature = "diagnostics")]
295    pub(in crate::db) fn current_get_call_count() -> u64 {
296        DATA_STORE_GET_CALL_COUNT.with(Cell::get)
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use crate::{
304        db::{
305            data::DecodedDataStoreKey,
306            key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
307        },
308        testing::test_memory,
309    };
310    use std::ops::Bound;
311
312    fn raw_key(entity: u64, id: u64) -> RawDataStoreKey {
313        DecodedDataStoreKey::new(
314            EntityTag::new(entity),
315            &PrimaryKeyValue::Scalar(PrimaryKeyComponent::Nat64(id)),
316        )
317        .to_raw()
318        .expect("test data key should encode")
319    }
320
321    fn raw_row(value: u8) -> RawRow {
322        RawRow::try_new(vec![value]).expect("test raw row should be bounded")
323    }
324
325    fn seed_store(memory_id: u8, entries: &[(u64, u64, u8)]) -> DataStore {
326        let mut store = DataStore::init(test_memory(memory_id));
327        for (entity, id, row) in entries {
328            store.insert_raw_for_test(raw_key(*entity, *id), raw_row(*row));
329        }
330        store
331    }
332
333    fn seed_heap_store(entries: &[(u64, u64, u8)]) -> DataStore {
334        let mut store = DataStore::init_heap();
335        for (entity, id, row) in entries {
336            store.insert_raw_for_test(raw_key(*entity, *id), raw_row(*row));
337        }
338        store
339    }
340
341    fn collect_keys(store: &DataStore) -> Vec<RawDataStoreKey> {
342        let mut keys = Vec::new();
343        let _: Result<(), Infallible> = store.visit_entries(|key, _row| {
344            keys.push(key.clone());
345            Ok(StoreVisit::Continue)
346        });
347        keys
348    }
349
350    #[test]
351    fn data_store_visit_entries_preserves_storage_key_order() {
352        let store = seed_store(221, &[(2, 1, 21), (1, 3, 13), (1, 1, 11), (1, 2, 12)]);
353
354        let mut expected = vec![raw_key(2, 1), raw_key(1, 3), raw_key(1, 1), raw_key(1, 2)];
355        expected.sort();
356
357        assert_eq!(collect_keys(&store), expected);
358    }
359
360    #[test]
361    fn data_store_visit_range_preserves_raw_key_bounds() {
362        let store = seed_store(222, &[(1, 1, 11), (1, 2, 12), (1, 3, 13), (1, 4, 14)]);
363
364        let mut visited = Vec::new();
365        let _: Result<(), Infallible> = store.visit_range(
366            (
367                Bound::Included(raw_key(1, 2)),
368                Bound::Excluded(raw_key(1, 4)),
369            ),
370            |key, row| {
371                visited.push((key.clone(), row.as_bytes()[0]));
372                Ok(StoreVisit::Continue)
373            },
374        );
375
376        assert_eq!(visited, vec![(raw_key(1, 2), 12), (raw_key(1, 3), 13)]);
377    }
378
379    #[test]
380    fn data_store_visit_entity_preserves_compact_entity_prefix_bounds() {
381        let store = seed_store(223, &[(2, 1, 21), (1, 2, 12), (2, 3, 23), (1, 1, 11)]);
382
383        let mut visited = Vec::new();
384        let _: Result<(), Infallible> = store.visit_entity(EntityTag::new(2), |key, row| {
385            visited.push((key.clone(), row.as_bytes()[0]));
386            Ok(StoreVisit::Continue)
387        });
388
389        assert_eq!(visited, vec![(raw_key(2, 1), 21), (raw_key(2, 3), 23)]);
390    }
391
392    #[test]
393    fn data_store_visit_entries_can_stop_without_error() {
394        let store = seed_store(224, &[(1, 1, 11), (1, 2, 12), (1, 3, 13)]);
395
396        let mut visited = Vec::new();
397        let _: Result<(), Infallible> = store.visit_entries(|key, _row| {
398            visited.push(key.clone());
399            Ok(if visited.len() == 2 {
400                StoreVisit::Stop
401            } else {
402                StoreVisit::Continue
403            })
404        });
405
406        assert_eq!(visited, vec![raw_key(1, 1), raw_key(1, 2)]);
407    }
408
409    #[test]
410    fn heap_data_store_preserves_order_bounds_and_early_stop() {
411        let store = seed_heap_store(&[(2, 1, 21), (1, 3, 13), (1, 1, 11), (1, 2, 12)]);
412
413        let mut expected = vec![raw_key(2, 1), raw_key(1, 3), raw_key(1, 1), raw_key(1, 2)];
414        expected.sort();
415        assert_eq!(collect_keys(&store), expected);
416
417        let mut ranged = Vec::new();
418        let _: Result<(), Infallible> = store.visit_range(
419            (
420                Bound::Included(raw_key(1, 1)),
421                Bound::Excluded(raw_key(1, 3)),
422            ),
423            |key, row| {
424                ranged.push((key.clone(), row.as_bytes()[0]));
425                Ok(StoreVisit::Continue)
426            },
427        );
428        assert_eq!(ranged, vec![(raw_key(1, 1), 11), (raw_key(1, 2), 12)]);
429
430        let mut entity = Vec::new();
431        let _: Result<(), Infallible> = store.visit_entity(EntityTag::new(2), |key, row| {
432            entity.push((key.clone(), row.as_bytes()[0]));
433            Ok(StoreVisit::Continue)
434        });
435        assert_eq!(entity, vec![(raw_key(2, 1), 21)]);
436
437        let mut stopped = Vec::new();
438        let _: Result<(), Infallible> = store.visit_entries(|key, _| {
439            stopped.push(key.clone());
440            Ok(if stopped.len() == 2 {
441                StoreVisit::Stop
442            } else {
443                StoreVisit::Continue
444            })
445        });
446        assert_eq!(stopped, vec![raw_key(1, 1), raw_key(1, 2)]);
447    }
448}