Skip to main content

icydb_core/db/data/
store.rs

1//! Module: data::store
2//! Responsibility: stable-or-heap row storage behind the data-store boundary.
3//! Does not own: key/row validation policy beyond type boundaries.
4//! Boundary: commit/executor call into this layer after prevalidation.
5
6use crate::{
7    db::{
8        data::{CanonicalRow, RawDataStoreKey, RawRow},
9        key_taxonomy::RawDataStoreKeyRange,
10    },
11    types::EntityTag,
12};
13use ic_memory::stable_structures::{
14    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
15};
16#[cfg(feature = "diagnostics")]
17use std::cell::Cell;
18use std::collections::BTreeMap as HeapBTreeMap;
19use std::convert::Infallible;
20use std::ops::RangeBounds;
21
22#[cfg(feature = "diagnostics")]
23thread_local! {
24    static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
25}
26
27#[cfg(feature = "diagnostics")]
28fn record_data_store_get_call() {
29    DATA_STORE_GET_CALL_COUNT.with(|count| {
30        count.set(count.get().saturating_add(1));
31    });
32}
33
34///
35/// DataStore
36///
37/// Thin persistence wrapper over one stable or heap BTreeMap.
38///
39/// Invariant: callers provide already-validated `RawDataStoreKey` and canonical row bytes.
40/// This type intentionally does not enforce commit-phase ordering.
41///
42
43pub struct DataStore {
44    backend: DataStoreBackend,
45}
46
47enum DataStoreBackend {
48    Stable(StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>),
49    Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
50}
51
52/// Control-flow result for store traversal visitors.
53#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub(in crate::db) enum StoreVisit {
55    Continue,
56    Stop,
57}
58
59impl StoreVisit {
60    const fn should_stop(self) -> bool {
61        matches!(self, Self::Stop)
62    }
63}
64
65impl DataStore {
66    /// Initialize a data store with the provided backing memory.
67    #[must_use]
68    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
69        Self {
70            backend: DataStoreBackend::Stable(StableBTreeMap::init(memory)),
71        }
72    }
73
74    /// Initialize a volatile heap-backed data store.
75    #[must_use]
76    pub const fn init_heap() -> Self {
77        Self {
78            backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
79        }
80    }
81
82    /// Insert or replace one row by raw key.
83    pub(in crate::db) fn insert(
84        &mut self,
85        key: RawDataStoreKey,
86        row: CanonicalRow,
87    ) -> Option<RawRow> {
88        let row = row.into_raw_row();
89        match &mut self.backend {
90            DataStoreBackend::Stable(map) => map.insert(key, row),
91            DataStoreBackend::Heap(map) => map.insert(key, row),
92        }
93    }
94
95    /// Insert one raw row directly for corruption-focused test setup only.
96    #[cfg(test)]
97    pub(in crate::db) fn insert_raw_for_test(
98        &mut self,
99        key: RawDataStoreKey,
100        row: RawRow,
101    ) -> Option<RawRow> {
102        match &mut self.backend {
103            DataStoreBackend::Stable(map) => map.insert(key, row),
104            DataStoreBackend::Heap(map) => map.insert(key, row),
105        }
106    }
107
108    /// Remove one row by raw key.
109    pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
110        match &mut self.backend {
111            DataStoreBackend::Stable(map) => map.remove(key),
112            DataStoreBackend::Heap(map) => map.remove(key),
113        }
114    }
115
116    /// Load one row by raw key.
117    pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
118        #[cfg(feature = "diagnostics")]
119        record_data_store_get_call();
120
121        match &self.backend {
122            DataStoreBackend::Stable(map) => map.get(key),
123            DataStoreBackend::Heap(map) => map.get(key).cloned(),
124        }
125    }
126
127    /// Return whether one raw key exists without cloning the row payload.
128    #[must_use]
129    pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
130        match &self.backend {
131            DataStoreBackend::Stable(map) => map.contains_key(key),
132            DataStoreBackend::Heap(map) => map.contains_key(key),
133        }
134    }
135
136    /// Clear all stored rows from the data store.
137    #[cfg(test)]
138    pub(in crate::db) fn clear(&mut self) {
139        match &mut self.backend {
140            DataStoreBackend::Stable(map) => map.clear_new(),
141            DataStoreBackend::Heap(map) => map.clear(),
142        }
143    }
144
145    /// Return the number of stored rows without exposing the backing map.
146    #[must_use]
147    pub(in crate::db) fn len(&self) -> u64 {
148        match &self.backend {
149            DataStoreBackend::Stable(map) => map.len(),
150            DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
151        }
152    }
153
154    /// Return whether the data store currently contains no rows.
155    #[must_use]
156    #[cfg(test)]
157    pub(in crate::db) fn is_empty(&self) -> bool {
158        match &self.backend {
159            DataStoreBackend::Stable(map) => map.is_empty(),
160            DataStoreBackend::Heap(map) => map.is_empty(),
161        }
162    }
163
164    /// Visit raw row entries in canonical storage order.
165    pub(in crate::db) fn visit_entries<E>(
166        &self,
167        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
168    ) -> Result<(), E> {
169        match &self.backend {
170            DataStoreBackend::Stable(map) => {
171                for entry in map.iter() {
172                    if visitor(entry.key(), &entry.value())?.should_stop() {
173                        break;
174                    }
175                }
176            }
177            DataStoreBackend::Heap(map) => {
178                for (key, row) in map {
179                    if visitor(key, row)?.should_stop() {
180                        break;
181                    }
182                }
183            }
184        }
185
186        Ok(())
187    }
188
189    /// Visit raw row entries in reverse canonical storage order.
190    pub(in crate::db) fn visit_entries_rev<E>(
191        &self,
192        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
193    ) -> Result<(), E> {
194        match &self.backend {
195            DataStoreBackend::Stable(map) => {
196                for entry in map.iter().rev() {
197                    if visitor(entry.key(), &entry.value())?.should_stop() {
198                        break;
199                    }
200                }
201            }
202            DataStoreBackend::Heap(map) => {
203                for (key, row) in map.iter().rev() {
204                    if visitor(key, row)?.should_stop() {
205                        break;
206                    }
207                }
208            }
209        }
210
211        Ok(())
212    }
213
214    /// Visit raw row entries whose keys belong to the provided storage range.
215    pub(in crate::db) fn visit_range<E>(
216        &self,
217        key_range: impl RangeBounds<RawDataStoreKey>,
218        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
219    ) -> Result<(), E> {
220        match &self.backend {
221            DataStoreBackend::Stable(map) => {
222                for entry in map.range(key_range) {
223                    if visitor(entry.key(), &entry.value())?.should_stop() {
224                        break;
225                    }
226                }
227            }
228            DataStoreBackend::Heap(map) => {
229                for (key, row) in map.range(key_range) {
230                    if visitor(key, row)?.should_stop() {
231                        break;
232                    }
233                }
234            }
235        }
236
237        Ok(())
238    }
239
240    /// Visit raw row entries in reverse order whose keys belong to the provided storage range.
241    pub(in crate::db) fn visit_range_rev<E>(
242        &self,
243        key_range: impl RangeBounds<RawDataStoreKey>,
244        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
245    ) -> Result<(), E> {
246        match &self.backend {
247            DataStoreBackend::Stable(map) => {
248                for entry in map.range(key_range).rev() {
249                    if visitor(entry.key(), &entry.value())?.should_stop() {
250                        break;
251                    }
252                }
253            }
254            DataStoreBackend::Heap(map) => {
255                for (key, row) in map.range(key_range).rev() {
256                    if visitor(key, row)?.should_stop() {
257                        break;
258                    }
259                }
260            }
261        }
262
263        Ok(())
264    }
265
266    /// Visit raw row entries for one entity using compact prefix bounds.
267    pub(in crate::db) fn visit_entity<E>(
268        &self,
269        entity: EntityTag,
270        visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
271    ) -> Result<(), E> {
272        let range = RawDataStoreKeyRange::entity_prefix(entity);
273        self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
274    }
275
276    /// Sum of bytes used by all stored rows.
277    pub(in crate::db) fn memory_bytes(&self) -> u64 {
278        // Report map footprint as key bytes + row bytes per entry.
279        let mut bytes = 0u64;
280        let _: Result<(), Infallible> = self.visit_entries(|key, row| {
281            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
282            Ok(StoreVisit::Continue)
283        });
284        bytes
285    }
286
287    /// Return the monotonic perf-only count of stable row fetches seen by this process.
288    #[cfg(feature = "diagnostics")]
289    pub(in crate::db) fn current_get_call_count() -> u64 {
290        DATA_STORE_GET_CALL_COUNT.with(Cell::get)
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297    use crate::{
298        db::{
299            data::DecodedDataStoreKey,
300            key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
301        },
302        testing::test_memory,
303    };
304    use std::ops::Bound;
305
306    fn raw_key(entity: u64, id: u64) -> RawDataStoreKey {
307        DecodedDataStoreKey::new(
308            EntityTag::new(entity),
309            &PrimaryKeyValue::Scalar(PrimaryKeyComponent::Nat64(id)),
310        )
311        .to_raw()
312        .expect("test data key should encode")
313    }
314
315    fn raw_row(value: u8) -> RawRow {
316        RawRow::try_new(vec![value]).expect("test raw row should be bounded")
317    }
318
319    fn seed_store(memory_id: u8, entries: &[(u64, u64, u8)]) -> DataStore {
320        let mut store = DataStore::init(test_memory(memory_id));
321        for (entity, id, row) in entries {
322            store.insert_raw_for_test(raw_key(*entity, *id), raw_row(*row));
323        }
324        store
325    }
326
327    fn seed_heap_store(entries: &[(u64, u64, u8)]) -> DataStore {
328        let mut store = DataStore::init_heap();
329        for (entity, id, row) in entries {
330            store.insert_raw_for_test(raw_key(*entity, *id), raw_row(*row));
331        }
332        store
333    }
334
335    fn collect_keys(store: &DataStore) -> Vec<RawDataStoreKey> {
336        let mut keys = Vec::new();
337        let _: Result<(), Infallible> = store.visit_entries(|key, _row| {
338            keys.push(key.clone());
339            Ok(StoreVisit::Continue)
340        });
341        keys
342    }
343
344    #[test]
345    fn data_store_visit_entries_preserves_storage_key_order() {
346        let store = seed_store(221, &[(2, 1, 21), (1, 3, 13), (1, 1, 11), (1, 2, 12)]);
347
348        let mut expected = vec![raw_key(2, 1), raw_key(1, 3), raw_key(1, 1), raw_key(1, 2)];
349        expected.sort();
350
351        assert_eq!(collect_keys(&store), expected);
352    }
353
354    #[test]
355    fn data_store_visit_range_preserves_raw_key_bounds() {
356        let store = seed_store(222, &[(1, 1, 11), (1, 2, 12), (1, 3, 13), (1, 4, 14)]);
357
358        let mut visited = Vec::new();
359        let _: Result<(), Infallible> = store.visit_range(
360            (
361                Bound::Included(raw_key(1, 2)),
362                Bound::Excluded(raw_key(1, 4)),
363            ),
364            |key, row| {
365                visited.push((key.clone(), row.as_bytes()[0]));
366                Ok(StoreVisit::Continue)
367            },
368        );
369
370        assert_eq!(visited, vec![(raw_key(1, 2), 12), (raw_key(1, 3), 13)]);
371    }
372
373    #[test]
374    fn data_store_visit_entity_preserves_compact_entity_prefix_bounds() {
375        let store = seed_store(223, &[(2, 1, 21), (1, 2, 12), (2, 3, 23), (1, 1, 11)]);
376
377        let mut visited = Vec::new();
378        let _: Result<(), Infallible> = store.visit_entity(EntityTag::new(2), |key, row| {
379            visited.push((key.clone(), row.as_bytes()[0]));
380            Ok(StoreVisit::Continue)
381        });
382
383        assert_eq!(visited, vec![(raw_key(2, 1), 21), (raw_key(2, 3), 23)]);
384    }
385
386    #[test]
387    fn data_store_visit_entries_can_stop_without_error() {
388        let store = seed_store(224, &[(1, 1, 11), (1, 2, 12), (1, 3, 13)]);
389
390        let mut visited = Vec::new();
391        let _: Result<(), Infallible> = store.visit_entries(|key, _row| {
392            visited.push(key.clone());
393            Ok(if visited.len() == 2 {
394                StoreVisit::Stop
395            } else {
396                StoreVisit::Continue
397            })
398        });
399
400        assert_eq!(visited, vec![raw_key(1, 1), raw_key(1, 2)]);
401    }
402
403    #[test]
404    fn heap_data_store_preserves_order_bounds_and_early_stop() {
405        let store = seed_heap_store(&[(2, 1, 21), (1, 3, 13), (1, 1, 11), (1, 2, 12)]);
406
407        let mut expected = vec![raw_key(2, 1), raw_key(1, 3), raw_key(1, 1), raw_key(1, 2)];
408        expected.sort();
409        assert_eq!(collect_keys(&store), expected);
410
411        let mut ranged = Vec::new();
412        let _: Result<(), Infallible> = store.visit_range(
413            (
414                Bound::Included(raw_key(1, 1)),
415                Bound::Excluded(raw_key(1, 3)),
416            ),
417            |key, row| {
418                ranged.push((key.clone(), row.as_bytes()[0]));
419                Ok(StoreVisit::Continue)
420            },
421        );
422        assert_eq!(ranged, vec![(raw_key(1, 1), 11), (raw_key(1, 2), 12)]);
423
424        let mut entity = Vec::new();
425        let _: Result<(), Infallible> = store.visit_entity(EntityTag::new(2), |key, row| {
426            entity.push((key.clone(), row.as_bytes()[0]));
427            Ok(StoreVisit::Continue)
428        });
429        assert_eq!(entity, vec![(raw_key(2, 1), 21)]);
430
431        let mut stopped = Vec::new();
432        let _: Result<(), Infallible> = store.visit_entries(|key, _| {
433            stopped.push(key.clone());
434            Ok(if stopped.len() == 2 {
435                StoreVisit::Stop
436            } else {
437                StoreVisit::Continue
438            })
439        });
440        assert_eq!(stopped, vec![raw_key(1, 1), raw_key(1, 2)]);
441    }
442}