1use crate::{
7 db::{
8 data::{CanonicalRow, RawDataStoreKey, RawRow},
9 direction::Direction,
10 key_taxonomy::RawDataStoreKeyRange,
11 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
12 },
13 types::EntityTag,
14};
15use ic_memory::stable_structures::{
16 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
17};
18#[cfg(feature = "diagnostics")]
19use std::cell::Cell;
20use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
21use std::convert::Infallible;
22use std::ops::{Bound, RangeBounds};
23
24#[cfg(feature = "diagnostics")]
25thread_local! {
26 static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
27}
28
29#[cfg(feature = "diagnostics")]
30fn record_data_store_get_call() {
31 DATA_STORE_GET_CALL_COUNT.with(|count| {
32 count.set(count.get().saturating_add(1));
33 });
34}
35
36pub struct DataStore {
46 backend: DataStoreBackend,
47}
48
49enum DataStoreBackend {
50 Stable(StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>),
51 Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
52 Journaled {
53 canonical: StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>,
54 live: HeapBTreeMap<RawDataStoreKey, RawRow>,
55 tombstones: BTreeSet<RawDataStoreKey>,
56 },
57}
58
59#[derive(Clone, Copy, Debug, Eq, PartialEq)]
61pub(in crate::db) enum StoreVisit {
62 Continue,
63 Stop,
64}
65
66impl StoreVisit {
67 const fn should_stop(self) -> bool {
68 matches!(self, Self::Stop)
69 }
70}
71
72impl DataStore {
73 #[must_use]
75 pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
76 Self {
77 backend: DataStoreBackend::Stable(StableBTreeMap::init(memory)),
78 }
79 }
80
81 #[must_use]
83 pub const fn init_heap() -> Self {
84 Self {
85 backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
86 }
87 }
88
89 #[must_use]
95 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
96 Self {
97 backend: DataStoreBackend::Journaled {
98 canonical: StableBTreeMap::init(memory),
99 live: HeapBTreeMap::new(),
100 tombstones: BTreeSet::new(),
101 },
102 }
103 }
104
105 pub(in crate::db) fn insert(
107 &mut self,
108 key: RawDataStoreKey,
109 row: CanonicalRow,
110 ) -> Option<RawRow> {
111 let row = row.into_raw_row();
112 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
113 self.get(&key)
114 } else {
115 None
116 };
117 match &mut self.backend {
118 DataStoreBackend::Stable(map) => map.insert(key, row),
119 DataStoreBackend::Heap(map) => map.insert(key, row),
120 DataStoreBackend::Journaled {
121 live, tombstones, ..
122 } => {
123 tombstones.remove(&key);
124 live.insert(key, row);
125 previous_journaled
126 }
127 }
128 }
129
130 #[cfg(test)]
132 pub(in crate::db) fn insert_raw_for_test(
133 &mut self,
134 key: RawDataStoreKey,
135 row: RawRow,
136 ) -> Option<RawRow> {
137 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
138 self.get(&key)
139 } else {
140 None
141 };
142 match &mut self.backend {
143 DataStoreBackend::Stable(map) => map.insert(key, row),
144 DataStoreBackend::Heap(map) => map.insert(key, row),
145 DataStoreBackend::Journaled {
146 live, tombstones, ..
147 } => {
148 tombstones.remove(&key);
149 live.insert(key, row);
150 previous_journaled
151 }
152 }
153 }
154
155 pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
157 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
158 self.get(key)
159 } else {
160 None
161 };
162 match &mut self.backend {
163 DataStoreBackend::Stable(map) => map.remove(key),
164 DataStoreBackend::Heap(map) => map.remove(key),
165 DataStoreBackend::Journaled {
166 live, tombstones, ..
167 } => {
168 live.remove(key);
169 tombstones.insert(key.clone());
170 previous_journaled
171 }
172 }
173 }
174
175 pub(in crate::db) fn reset_journaled_live_projection(
178 &mut self,
179 ) -> Result<(), crate::error::InternalError> {
180 let DataStoreBackend::Journaled {
181 live, tombstones, ..
182 } = &mut self.backend
183 else {
184 return Err(crate::error::InternalError::store_invariant(
185 "journaled live projection reset requires a journaled data store",
186 ));
187 };
188
189 live.clear();
190 tombstones.clear();
191
192 Ok(())
193 }
194
195 pub(in crate::db) fn apply_recovered_journal_put(
197 &mut self,
198 key: RawDataStoreKey,
199 row: RawRow,
200 ) -> Result<Option<RawRow>, crate::error::InternalError> {
201 let DataStoreBackend::Journaled {
202 canonical,
203 live,
204 tombstones,
205 } = &mut self.backend
206 else {
207 return Err(crate::error::InternalError::store_invariant(
208 "journal row replay requires a journaled data store",
209 ));
210 };
211
212 let previous = if tombstones.contains(&key) {
213 None
214 } else {
215 live.get(&key).cloned().or_else(|| canonical.get(&key))
216 };
217 tombstones.remove(&key);
218 live.insert(key, row);
219
220 Ok(previous)
221 }
222
223 pub(in crate::db) fn apply_recovered_journal_delete(
225 &mut self,
226 key: &RawDataStoreKey,
227 ) -> Result<Option<RawRow>, crate::error::InternalError> {
228 let DataStoreBackend::Journaled {
229 canonical,
230 live,
231 tombstones,
232 } = &mut self.backend
233 else {
234 return Err(crate::error::InternalError::store_invariant(
235 "journal row replay requires a journaled data store",
236 ));
237 };
238
239 let previous = if tombstones.contains(key) {
240 None
241 } else {
242 live.get(key).cloned().or_else(|| canonical.get(key))
243 };
244 live.remove(key);
245 tombstones.insert(key.clone());
246
247 Ok(previous)
248 }
249
250 pub(in crate::db) fn fold_recovered_journal_put(
252 &mut self,
253 key: RawDataStoreKey,
254 row: RawRow,
255 ) -> Result<Option<RawRow>, crate::error::InternalError> {
256 let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
257 return Err(crate::error::InternalError::store_invariant(
258 "journal row fold requires a journaled data store",
259 ));
260 };
261
262 Ok(canonical.insert(key, row))
263 }
264
265 pub(in crate::db) fn fold_recovered_journal_delete(
267 &mut self,
268 key: &RawDataStoreKey,
269 ) -> Result<Option<RawRow>, crate::error::InternalError> {
270 let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
271 return Err(crate::error::InternalError::store_invariant(
272 "journal row fold requires a journaled data store",
273 ));
274 };
275
276 Ok(canonical.remove(key))
277 }
278
279 pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
281 #[cfg(feature = "diagnostics")]
282 record_data_store_get_call();
283
284 match &self.backend {
285 DataStoreBackend::Stable(map) => map.get(key),
286 DataStoreBackend::Heap(map) => map.get(key).cloned(),
287 DataStoreBackend::Journaled { .. } => Self::journaled_get_raw(&self.backend, key),
288 }
289 }
290
291 #[must_use]
293 pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
294 match &self.backend {
295 DataStoreBackend::Stable(map) => map.contains_key(key),
296 DataStoreBackend::Heap(map) => map.contains_key(key),
297 DataStoreBackend::Journaled { .. } => {
298 Self::journaled_get_raw(&self.backend, key).is_some()
299 }
300 }
301 }
302
303 #[cfg(test)]
305 pub(in crate::db) fn clear(&mut self) {
306 match &mut self.backend {
307 DataStoreBackend::Stable(map) => map.clear_new(),
308 DataStoreBackend::Heap(map) => map.clear(),
309 DataStoreBackend::Journaled {
310 canonical,
311 live,
312 tombstones,
313 } => {
314 live.clear();
315 tombstones.clear();
316 for entry in canonical.iter() {
317 tombstones.insert(entry.key().clone());
318 }
319 }
320 }
321 }
322
323 #[must_use]
325 pub(in crate::db) fn len(&self) -> u64 {
326 match &self.backend {
327 DataStoreBackend::Stable(map) => map.len(),
328 DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
329 DataStoreBackend::Journaled { .. } => {
330 let mut count = 0_u64;
331 let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
332 count = count.saturating_add(1);
333 Ok(StoreVisit::Continue)
334 });
335 count
336 }
337 }
338 }
339
340 #[must_use]
342 #[cfg(test)]
343 pub(in crate::db) fn is_empty(&self) -> bool {
344 match &self.backend {
345 DataStoreBackend::Stable(map) => map.is_empty(),
346 DataStoreBackend::Heap(map) => map.is_empty(),
347 DataStoreBackend::Journaled { .. } => {
348 let mut empty = true;
349 let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
350 empty = false;
351 Ok(StoreVisit::Stop)
352 });
353 empty
354 }
355 }
356 }
357
358 pub(in crate::db) fn visit_entries<E>(
360 &self,
361 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
362 ) -> Result<(), E> {
363 match &self.backend {
364 DataStoreBackend::Stable(map) => {
365 for entry in map.iter() {
366 if visitor(entry.key(), &entry.value())?.should_stop() {
367 break;
368 }
369 }
370 }
371 DataStoreBackend::Heap(map) => {
372 for (key, row) in map {
373 if visitor(key, row)?.should_stop() {
374 break;
375 }
376 }
377 }
378 DataStoreBackend::Journaled {
379 canonical: _,
380 live: _,
381 tombstones: _,
382 } => Self::visit_journaled_entries_in_bounds(
383 &self.backend,
384 (Bound::Unbounded, Bound::Unbounded),
385 false,
386 visitor,
387 )?,
388 }
389
390 Ok(())
391 }
392
393 pub(in crate::db) fn visit_entries_rev<E>(
395 &self,
396 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
397 ) -> Result<(), E> {
398 match &self.backend {
399 DataStoreBackend::Stable(map) => {
400 for entry in map.iter().rev() {
401 if visitor(entry.key(), &entry.value())?.should_stop() {
402 break;
403 }
404 }
405 }
406 DataStoreBackend::Heap(map) => {
407 for (key, row) in map.iter().rev() {
408 if visitor(key, row)?.should_stop() {
409 break;
410 }
411 }
412 }
413 DataStoreBackend::Journaled {
414 canonical: _,
415 live: _,
416 tombstones: _,
417 } => Self::visit_journaled_entries_in_bounds(
418 &self.backend,
419 (Bound::Unbounded, Bound::Unbounded),
420 true,
421 visitor,
422 )?,
423 }
424
425 Ok(())
426 }
427
428 pub(in crate::db) fn visit_range<E>(
430 &self,
431 key_range: impl RangeBounds<RawDataStoreKey>,
432 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
433 ) -> Result<(), E> {
434 let bounds = Self::owned_range_bounds(&key_range);
435 match &self.backend {
436 DataStoreBackend::Stable(map) => {
437 for entry in map.range((bounds.0.clone(), bounds.1)) {
438 if visitor(entry.key(), &entry.value())?.should_stop() {
439 break;
440 }
441 }
442 }
443 DataStoreBackend::Heap(map) => {
444 for (key, row) in map.range((bounds.0.clone(), bounds.1)) {
445 if visitor(key, row)?.should_stop() {
446 break;
447 }
448 }
449 }
450 DataStoreBackend::Journaled {
451 canonical: _,
452 live: _,
453 tombstones: _,
454 } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, false, visitor)?,
455 }
456
457 Ok(())
458 }
459
460 pub(in crate::db) fn visit_range_rev<E>(
462 &self,
463 key_range: impl RangeBounds<RawDataStoreKey>,
464 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
465 ) -> Result<(), E> {
466 let bounds = Self::owned_range_bounds(&key_range);
467 match &self.backend {
468 DataStoreBackend::Stable(map) => {
469 for entry in map.range((bounds.0.clone(), bounds.1)).rev() {
470 if visitor(entry.key(), &entry.value())?.should_stop() {
471 break;
472 }
473 }
474 }
475 DataStoreBackend::Heap(map) => {
476 for (key, row) in map.range((bounds.0.clone(), bounds.1)).rev() {
477 if visitor(key, row)?.should_stop() {
478 break;
479 }
480 }
481 }
482 DataStoreBackend::Journaled {
483 canonical: _,
484 live: _,
485 tombstones: _,
486 } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, true, visitor)?,
487 }
488
489 Ok(())
490 }
491
492 pub(in crate::db) fn visit_entity<E>(
494 &self,
495 entity: EntityTag,
496 visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
497 ) -> Result<(), E> {
498 let range = RawDataStoreKeyRange::entity_prefix(entity);
499 self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
500 }
501
502 pub(in crate::db) fn memory_bytes(&self) -> u64 {
504 let mut bytes = 0u64;
506 let _: Result<(), Infallible> = self.visit_entries(|key, row| {
507 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
508 Ok(StoreVisit::Continue)
509 });
510 bytes
511 }
512
513 #[cfg(feature = "diagnostics")]
515 pub(in crate::db) fn current_get_call_count() -> u64 {
516 DATA_STORE_GET_CALL_COUNT.with(Cell::get)
517 }
518
519 #[cfg(test)]
520 #[must_use]
521 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
522 match &self.backend {
523 DataStoreBackend::Stable(map) | DataStoreBackend::Journaled { canonical: map, .. } => {
524 map.len()
525 }
526 DataStoreBackend::Heap(_) => 0,
527 }
528 }
529
530 fn journaled_get_raw(backend: &DataStoreBackend, key: &RawDataStoreKey) -> Option<RawRow> {
531 let DataStoreBackend::Journaled {
532 canonical,
533 live,
534 tombstones,
535 } = backend
536 else {
537 return None;
538 };
539
540 if tombstones.contains(key) {
541 return None;
542 }
543 live.get(key).cloned().or_else(|| canonical.get(key))
544 }
545
546 fn owned_range_bounds(
547 key_range: &impl RangeBounds<RawDataStoreKey>,
548 ) -> (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>) {
549 let lower = match key_range.start_bound() {
550 Bound::Included(key) => Bound::Included(key.clone()),
551 Bound::Excluded(key) => Bound::Excluded(key.clone()),
552 Bound::Unbounded => Bound::Unbounded,
553 };
554 let upper = match key_range.end_bound() {
555 Bound::Included(key) => Bound::Included(key.clone()),
556 Bound::Excluded(key) => Bound::Excluded(key.clone()),
557 Bound::Unbounded => Bound::Unbounded,
558 };
559
560 (lower, upper)
561 }
562
563 fn visit_journaled_entries_in_bounds<E>(
564 backend: &DataStoreBackend,
565 bounds: (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>),
566 reverse: bool,
567 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
568 ) -> Result<(), E> {
569 let DataStoreBackend::Journaled {
570 canonical,
571 live,
572 tombstones,
573 } = backend
574 else {
575 return Ok(());
576 };
577
578 if canonical.is_empty() {
579 if reverse {
580 for (key, row) in live.range(bounds).rev() {
581 if visitor(key, row)?.should_stop() {
582 return Ok(());
583 }
584 }
585 } else {
586 for (key, row) in live.range(bounds) {
587 if visitor(key, row)?.should_stop() {
588 return Ok(());
589 }
590 }
591 }
592 return Ok(());
593 }
594
595 if live.is_empty() && tombstones.is_empty() {
596 if reverse {
597 for entry in canonical.range(bounds).rev() {
598 if visitor(entry.key(), &entry.value())?.should_stop() {
599 return Ok(());
600 }
601 }
602 } else {
603 for entry in canonical.range(bounds) {
604 if visitor(entry.key(), &entry.value())?.should_stop() {
605 return Ok(());
606 }
607 }
608 }
609 return Ok(());
610 }
611
612 match if reverse {
613 Direction::Desc
614 } else {
615 Direction::Asc
616 } {
617 Direction::Asc => visit_ordered_overlay(
618 canonical.range((bounds.0.clone(), bounds.1.clone())),
619 live.range((bounds.0, bounds.1)),
620 Direction::Asc,
621 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
622 |canonical_entry| !tombstones.contains(canonical_entry.key()),
623 |live_entry| !tombstones.contains(live_entry.0),
624 |entry| {
625 let visit = match entry {
626 OrderedOverlayEntry::Canonical(canonical_entry) => {
627 visitor(canonical_entry.key(), &canonical_entry.value())?
628 }
629 OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
630 };
631 Ok(if visit.should_stop() {
632 OrderedOverlayVisit::Stop
633 } else {
634 OrderedOverlayVisit::Continue
635 })
636 },
637 ),
638 Direction::Desc => visit_ordered_overlay(
639 canonical.range((bounds.0.clone(), bounds.1.clone())).rev(),
640 live.range((bounds.0, bounds.1)).rev(),
641 Direction::Desc,
642 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
643 |canonical_entry| !tombstones.contains(canonical_entry.key()),
644 |live_entry| !tombstones.contains(live_entry.0),
645 |entry| {
646 let visit = match entry {
647 OrderedOverlayEntry::Canonical(canonical_entry) => {
648 visitor(canonical_entry.key(), &canonical_entry.value())?
649 }
650 OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
651 };
652 Ok(if visit.should_stop() {
653 OrderedOverlayVisit::Stop
654 } else {
655 OrderedOverlayVisit::Continue
656 })
657 },
658 ),
659 }
660 }
661}
662
663#[cfg(test)]
664mod tests;