1use crate::{
7 db::{
8 data::{CanonicalRow, RawDataStoreKey, RawRow},
9 key_taxonomy::RawDataStoreKeyRange,
10 },
11 types::EntityTag,
12};
13use ic_memory::stable_structures::{
14 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
15};
16#[cfg(feature = "diagnostics")]
17use std::cell::Cell;
18use std::collections::BTreeMap as HeapBTreeMap;
19use std::convert::Infallible;
20use std::ops::RangeBounds;
21
22#[cfg(feature = "diagnostics")]
23thread_local! {
24 static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
25}
26
27#[cfg(feature = "diagnostics")]
28fn record_data_store_get_call() {
29 DATA_STORE_GET_CALL_COUNT.with(|count| {
30 count.set(count.get().saturating_add(1));
31 });
32}
33
34pub struct DataStore {
44 backend: DataStoreBackend,
45}
46
47enum DataStoreBackend {
48 Stable(StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>),
49 Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
50}
51
52#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub(in crate::db) enum StoreVisit {
55 Continue,
56 Stop,
57}
58
59impl StoreVisit {
60 const fn should_stop(self) -> bool {
61 matches!(self, Self::Stop)
62 }
63}
64
65impl DataStore {
66 #[must_use]
68 pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
69 Self {
70 backend: DataStoreBackend::Stable(StableBTreeMap::init(memory)),
71 }
72 }
73
74 #[must_use]
76 pub const fn init_heap() -> Self {
77 Self {
78 backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
79 }
80 }
81
82 #[must_use]
84 pub(in crate::db) const fn is_heap_storage(&self) -> bool {
85 matches!(self.backend, DataStoreBackend::Heap(_))
86 }
87
88 pub(in crate::db) fn insert(
90 &mut self,
91 key: RawDataStoreKey,
92 row: CanonicalRow,
93 ) -> Option<RawRow> {
94 let row = row.into_raw_row();
95 match &mut self.backend {
96 DataStoreBackend::Stable(map) => map.insert(key, row),
97 DataStoreBackend::Heap(map) => map.insert(key, row),
98 }
99 }
100
101 #[cfg(test)]
103 pub(in crate::db) fn insert_raw_for_test(
104 &mut self,
105 key: RawDataStoreKey,
106 row: RawRow,
107 ) -> Option<RawRow> {
108 match &mut self.backend {
109 DataStoreBackend::Stable(map) => map.insert(key, row),
110 DataStoreBackend::Heap(map) => map.insert(key, row),
111 }
112 }
113
114 pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
116 match &mut self.backend {
117 DataStoreBackend::Stable(map) => map.remove(key),
118 DataStoreBackend::Heap(map) => map.remove(key),
119 }
120 }
121
122 pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
124 #[cfg(feature = "diagnostics")]
125 record_data_store_get_call();
126
127 match &self.backend {
128 DataStoreBackend::Stable(map) => map.get(key),
129 DataStoreBackend::Heap(map) => map.get(key).cloned(),
130 }
131 }
132
133 #[must_use]
135 pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
136 match &self.backend {
137 DataStoreBackend::Stable(map) => map.contains_key(key),
138 DataStoreBackend::Heap(map) => map.contains_key(key),
139 }
140 }
141
142 #[cfg(test)]
144 pub(in crate::db) fn clear(&mut self) {
145 match &mut self.backend {
146 DataStoreBackend::Stable(map) => map.clear_new(),
147 DataStoreBackend::Heap(map) => map.clear(),
148 }
149 }
150
151 #[must_use]
153 pub(in crate::db) fn len(&self) -> u64 {
154 match &self.backend {
155 DataStoreBackend::Stable(map) => map.len(),
156 DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
157 }
158 }
159
160 #[must_use]
162 #[cfg(test)]
163 pub(in crate::db) fn is_empty(&self) -> bool {
164 match &self.backend {
165 DataStoreBackend::Stable(map) => map.is_empty(),
166 DataStoreBackend::Heap(map) => map.is_empty(),
167 }
168 }
169
170 pub(in crate::db) fn visit_entries<E>(
172 &self,
173 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
174 ) -> Result<(), E> {
175 match &self.backend {
176 DataStoreBackend::Stable(map) => {
177 for entry in map.iter() {
178 if visitor(entry.key(), &entry.value())?.should_stop() {
179 break;
180 }
181 }
182 }
183 DataStoreBackend::Heap(map) => {
184 for (key, row) in map {
185 if visitor(key, row)?.should_stop() {
186 break;
187 }
188 }
189 }
190 }
191
192 Ok(())
193 }
194
195 pub(in crate::db) fn visit_entries_rev<E>(
197 &self,
198 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
199 ) -> Result<(), E> {
200 match &self.backend {
201 DataStoreBackend::Stable(map) => {
202 for entry in map.iter().rev() {
203 if visitor(entry.key(), &entry.value())?.should_stop() {
204 break;
205 }
206 }
207 }
208 DataStoreBackend::Heap(map) => {
209 for (key, row) in map.iter().rev() {
210 if visitor(key, row)?.should_stop() {
211 break;
212 }
213 }
214 }
215 }
216
217 Ok(())
218 }
219
220 pub(in crate::db) fn visit_range<E>(
222 &self,
223 key_range: impl RangeBounds<RawDataStoreKey>,
224 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
225 ) -> Result<(), E> {
226 match &self.backend {
227 DataStoreBackend::Stable(map) => {
228 for entry in map.range(key_range) {
229 if visitor(entry.key(), &entry.value())?.should_stop() {
230 break;
231 }
232 }
233 }
234 DataStoreBackend::Heap(map) => {
235 for (key, row) in map.range(key_range) {
236 if visitor(key, row)?.should_stop() {
237 break;
238 }
239 }
240 }
241 }
242
243 Ok(())
244 }
245
246 pub(in crate::db) fn visit_range_rev<E>(
248 &self,
249 key_range: impl RangeBounds<RawDataStoreKey>,
250 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
251 ) -> Result<(), E> {
252 match &self.backend {
253 DataStoreBackend::Stable(map) => {
254 for entry in map.range(key_range).rev() {
255 if visitor(entry.key(), &entry.value())?.should_stop() {
256 break;
257 }
258 }
259 }
260 DataStoreBackend::Heap(map) => {
261 for (key, row) in map.range(key_range).rev() {
262 if visitor(key, row)?.should_stop() {
263 break;
264 }
265 }
266 }
267 }
268
269 Ok(())
270 }
271
272 pub(in crate::db) fn visit_entity<E>(
274 &self,
275 entity: EntityTag,
276 visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
277 ) -> Result<(), E> {
278 let range = RawDataStoreKeyRange::entity_prefix(entity);
279 self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
280 }
281
282 pub(in crate::db) fn memory_bytes(&self) -> u64 {
284 let mut bytes = 0u64;
286 let _: Result<(), Infallible> = self.visit_entries(|key, row| {
287 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
288 Ok(StoreVisit::Continue)
289 });
290 bytes
291 }
292
293 #[cfg(feature = "diagnostics")]
295 pub(in crate::db) fn current_get_call_count() -> u64 {
296 DATA_STORE_GET_CALL_COUNT.with(Cell::get)
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use crate::{
304 db::{
305 data::DecodedDataStoreKey,
306 key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
307 },
308 testing::test_memory,
309 };
310 use std::ops::Bound;
311
312 fn raw_key(entity: u64, id: u64) -> RawDataStoreKey {
313 DecodedDataStoreKey::new(
314 EntityTag::new(entity),
315 &PrimaryKeyValue::Scalar(PrimaryKeyComponent::Nat64(id)),
316 )
317 .to_raw()
318 .expect("test data key should encode")
319 }
320
321 fn raw_row(value: u8) -> RawRow {
322 RawRow::try_new(vec![value]).expect("test raw row should be bounded")
323 }
324
325 fn seed_store(memory_id: u8, entries: &[(u64, u64, u8)]) -> DataStore {
326 let mut store = DataStore::init(test_memory(memory_id));
327 for (entity, id, row) in entries {
328 store.insert_raw_for_test(raw_key(*entity, *id), raw_row(*row));
329 }
330 store
331 }
332
333 fn seed_heap_store(entries: &[(u64, u64, u8)]) -> DataStore {
334 let mut store = DataStore::init_heap();
335 for (entity, id, row) in entries {
336 store.insert_raw_for_test(raw_key(*entity, *id), raw_row(*row));
337 }
338 store
339 }
340
341 fn collect_keys(store: &DataStore) -> Vec<RawDataStoreKey> {
342 let mut keys = Vec::new();
343 let _: Result<(), Infallible> = store.visit_entries(|key, _row| {
344 keys.push(key.clone());
345 Ok(StoreVisit::Continue)
346 });
347 keys
348 }
349
350 #[test]
351 fn data_store_visit_entries_preserves_storage_key_order() {
352 let store = seed_store(221, &[(2, 1, 21), (1, 3, 13), (1, 1, 11), (1, 2, 12)]);
353
354 let mut expected = vec![raw_key(2, 1), raw_key(1, 3), raw_key(1, 1), raw_key(1, 2)];
355 expected.sort();
356
357 assert_eq!(collect_keys(&store), expected);
358 }
359
360 #[test]
361 fn data_store_visit_range_preserves_raw_key_bounds() {
362 let store = seed_store(222, &[(1, 1, 11), (1, 2, 12), (1, 3, 13), (1, 4, 14)]);
363
364 let mut visited = Vec::new();
365 let _: Result<(), Infallible> = store.visit_range(
366 (
367 Bound::Included(raw_key(1, 2)),
368 Bound::Excluded(raw_key(1, 4)),
369 ),
370 |key, row| {
371 visited.push((key.clone(), row.as_bytes()[0]));
372 Ok(StoreVisit::Continue)
373 },
374 );
375
376 assert_eq!(visited, vec![(raw_key(1, 2), 12), (raw_key(1, 3), 13)]);
377 }
378
379 #[test]
380 fn data_store_visit_entity_preserves_compact_entity_prefix_bounds() {
381 let store = seed_store(223, &[(2, 1, 21), (1, 2, 12), (2, 3, 23), (1, 1, 11)]);
382
383 let mut visited = Vec::new();
384 let _: Result<(), Infallible> = store.visit_entity(EntityTag::new(2), |key, row| {
385 visited.push((key.clone(), row.as_bytes()[0]));
386 Ok(StoreVisit::Continue)
387 });
388
389 assert_eq!(visited, vec![(raw_key(2, 1), 21), (raw_key(2, 3), 23)]);
390 }
391
392 #[test]
393 fn data_store_visit_entries_can_stop_without_error() {
394 let store = seed_store(224, &[(1, 1, 11), (1, 2, 12), (1, 3, 13)]);
395
396 let mut visited = Vec::new();
397 let _: Result<(), Infallible> = store.visit_entries(|key, _row| {
398 visited.push(key.clone());
399 Ok(if visited.len() == 2 {
400 StoreVisit::Stop
401 } else {
402 StoreVisit::Continue
403 })
404 });
405
406 assert_eq!(visited, vec![raw_key(1, 1), raw_key(1, 2)]);
407 }
408
409 #[test]
410 fn heap_data_store_preserves_order_bounds_and_early_stop() {
411 let store = seed_heap_store(&[(2, 1, 21), (1, 3, 13), (1, 1, 11), (1, 2, 12)]);
412
413 let mut expected = vec![raw_key(2, 1), raw_key(1, 3), raw_key(1, 1), raw_key(1, 2)];
414 expected.sort();
415 assert_eq!(collect_keys(&store), expected);
416
417 let mut ranged = Vec::new();
418 let _: Result<(), Infallible> = store.visit_range(
419 (
420 Bound::Included(raw_key(1, 1)),
421 Bound::Excluded(raw_key(1, 3)),
422 ),
423 |key, row| {
424 ranged.push((key.clone(), row.as_bytes()[0]));
425 Ok(StoreVisit::Continue)
426 },
427 );
428 assert_eq!(ranged, vec![(raw_key(1, 1), 11), (raw_key(1, 2), 12)]);
429
430 let mut entity = Vec::new();
431 let _: Result<(), Infallible> = store.visit_entity(EntityTag::new(2), |key, row| {
432 entity.push((key.clone(), row.as_bytes()[0]));
433 Ok(StoreVisit::Continue)
434 });
435 assert_eq!(entity, vec![(raw_key(2, 1), 21)]);
436
437 let mut stopped = Vec::new();
438 let _: Result<(), Infallible> = store.visit_entries(|key, _| {
439 stopped.push(key.clone());
440 Ok(if stopped.len() == 2 {
441 StoreVisit::Stop
442 } else {
443 StoreVisit::Continue
444 })
445 });
446 assert_eq!(stopped, vec![raw_key(1, 1), raw_key(1, 2)]);
447 }
448}