1use crate::{
7 db::{
8 data::{CanonicalRow, RawDataStoreKey, RawRow},
9 direction::Direction,
10 key_taxonomy::RawDataStoreKeyRange,
11 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
12 },
13 types::EntityTag,
14};
15use ic_memory::stable_structures::{
16 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
17};
18#[cfg(feature = "diagnostics")]
19use std::cell::Cell;
20use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
21use std::convert::Infallible;
22use std::ops::{Bound, RangeBounds};
23
24#[cfg(feature = "diagnostics")]
25thread_local! {
26 static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
27}
28
29#[cfg(feature = "diagnostics")]
30fn record_data_store_get_call() {
31 DATA_STORE_GET_CALL_COUNT.with(|count| {
32 count.set(count.get().saturating_add(1));
33 });
34}
35
36pub struct DataStore {
46 backend: DataStoreBackend,
47 generation: u64,
48 entity_cardinality: EntityCardinality,
49}
50
51enum DataStoreBackend {
52 Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
53 Journaled {
54 canonical: StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>,
55 live: HeapBTreeMap<RawDataStoreKey, RawRow>,
56 tombstones: BTreeSet<RawDataStoreKey>,
57 },
58}
59
60#[derive(Clone, Copy, Debug, Eq, PartialEq)]
62pub(in crate::db) enum StoreVisit {
63 Continue,
64 Stop,
65}
66
67impl StoreVisit {
68 const fn should_stop(self) -> bool {
69 matches!(self, Self::Stop)
70 }
71}
72
73impl DataStore {
74 #[must_use]
76 pub const fn init_heap() -> Self {
77 Self {
78 backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
79 generation: 0,
80 entity_cardinality: EntityCardinality::empty(),
81 }
82 }
83
84 #[must_use]
90 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
91 let mut store = Self {
92 backend: DataStoreBackend::Journaled {
93 canonical: StableBTreeMap::init(memory),
94 live: HeapBTreeMap::new(),
95 tombstones: BTreeSet::new(),
96 },
97 generation: 0,
98 entity_cardinality: EntityCardinality::empty(),
99 };
100 store.rebuild_entity_cardinality_from_entries();
101 store
102 }
103
104 pub(in crate::db) fn insert(
106 &mut self,
107 key: RawDataStoreKey,
108 row: CanonicalRow,
109 ) -> Option<RawRow> {
110 let row = row.into_raw_row();
111 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
112 self.get(&key)
113 } else {
114 None
115 };
116 let cardinality_key = key.clone();
117 let previous = match &mut self.backend {
118 DataStoreBackend::Heap(map) => map.insert(key, row),
119 DataStoreBackend::Journaled {
120 live, tombstones, ..
121 } => {
122 tombstones.remove(&key);
123 live.insert(key, row);
124 previous_journaled
125 }
126 };
127 self.entity_cardinality
128 .apply_insert(&cardinality_key, previous.as_ref());
129 self.bump_generation();
130 previous
131 }
132
133 #[cfg(test)]
135 pub(in crate::db) fn insert_raw_for_test(
136 &mut self,
137 key: RawDataStoreKey,
138 row: RawRow,
139 ) -> Option<RawRow> {
140 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
141 self.get(&key)
142 } else {
143 None
144 };
145 let cardinality_key = key.clone();
146 let previous = match &mut self.backend {
147 DataStoreBackend::Heap(map) => map.insert(key, row),
148 DataStoreBackend::Journaled {
149 live, tombstones, ..
150 } => {
151 tombstones.remove(&key);
152 live.insert(key, row);
153 previous_journaled
154 }
155 };
156 self.entity_cardinality
157 .apply_insert(&cardinality_key, previous.as_ref());
158 self.bump_generation();
159 previous
160 }
161
162 pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
164 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
165 self.get(key)
166 } else {
167 None
168 };
169 let previous = match &mut self.backend {
170 DataStoreBackend::Heap(map) => map.remove(key),
171 DataStoreBackend::Journaled {
172 live, tombstones, ..
173 } => {
174 live.remove(key);
175 tombstones.insert(key.clone());
176 previous_journaled
177 }
178 };
179 self.entity_cardinality.apply_remove(key, previous.as_ref());
180 self.bump_generation();
181 previous
182 }
183
184 pub(in crate::db) fn reset_journaled_live_projection(
187 &mut self,
188 ) -> Result<(), crate::error::InternalError> {
189 let DataStoreBackend::Journaled {
190 live, tombstones, ..
191 } = &mut self.backend
192 else {
193 return Err(crate::error::InternalError::store_invariant());
194 };
195
196 live.clear();
197 tombstones.clear();
198 self.rebuild_entity_cardinality_from_entries();
199 self.bump_generation();
200
201 Ok(())
202 }
203
204 pub(in crate::db) fn apply_recovered_journal_put(
206 &mut self,
207 key: RawDataStoreKey,
208 row: RawRow,
209 ) -> Result<Option<RawRow>, crate::error::InternalError> {
210 let DataStoreBackend::Journaled {
211 canonical,
212 live,
213 tombstones,
214 } = &mut self.backend
215 else {
216 return Err(crate::error::InternalError::store_invariant());
217 };
218
219 let previous = if tombstones.contains(&key) {
220 None
221 } else {
222 live.get(&key).cloned().or_else(|| canonical.get(&key))
223 };
224 tombstones.remove(&key);
225 let cardinality_key = key.clone();
226 live.insert(key, row);
227 self.entity_cardinality
228 .apply_insert(&cardinality_key, previous.as_ref());
229 self.bump_generation();
230
231 Ok(previous)
232 }
233
234 pub(in crate::db) fn apply_recovered_journal_delete(
236 &mut self,
237 key: &RawDataStoreKey,
238 ) -> Result<Option<RawRow>, crate::error::InternalError> {
239 let DataStoreBackend::Journaled {
240 canonical,
241 live,
242 tombstones,
243 } = &mut self.backend
244 else {
245 return Err(crate::error::InternalError::store_invariant());
246 };
247
248 let previous = if tombstones.contains(key) {
249 None
250 } else {
251 live.get(key).cloned().or_else(|| canonical.get(key))
252 };
253 live.remove(key);
254 tombstones.insert(key.clone());
255 self.entity_cardinality.apply_remove(key, previous.as_ref());
256 self.bump_generation();
257
258 Ok(previous)
259 }
260
261 pub(in crate::db) fn fold_recovered_journal_put(
263 &mut self,
264 key: RawDataStoreKey,
265 row: RawRow,
266 ) -> Result<Option<RawRow>, crate::error::InternalError> {
267 let DataStoreBackend::Journaled {
268 canonical,
269 live,
270 tombstones,
271 } = &mut self.backend
272 else {
273 return Err(crate::error::InternalError::store_invariant());
274 };
275
276 let visible = !live.contains_key(&key) && !tombstones.contains(&key);
277 let cardinality_key = key.clone();
278 let previous = canonical.insert(key, row);
279 if visible {
280 self.entity_cardinality
281 .apply_insert(&cardinality_key, previous.as_ref());
282 }
283 self.bump_generation();
284
285 Ok(previous)
286 }
287
288 pub(in crate::db) fn fold_recovered_journal_delete(
290 &mut self,
291 key: &RawDataStoreKey,
292 ) -> Result<Option<RawRow>, crate::error::InternalError> {
293 let DataStoreBackend::Journaled {
294 canonical,
295 live,
296 tombstones,
297 } = &mut self.backend
298 else {
299 return Err(crate::error::InternalError::store_invariant());
300 };
301
302 let visible = !live.contains_key(key) && !tombstones.contains(key);
303 let previous = canonical.remove(key);
304 if visible {
305 self.entity_cardinality.apply_remove(key, previous.as_ref());
306 }
307 self.bump_generation();
308
309 Ok(previous)
310 }
311
312 pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
314 #[cfg(feature = "diagnostics")]
315 record_data_store_get_call();
316
317 match &self.backend {
318 DataStoreBackend::Heap(map) => map.get(key).cloned(),
319 DataStoreBackend::Journaled { .. } => Self::journaled_get_raw(&self.backend, key),
320 }
321 }
322
323 #[must_use]
325 pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
326 match &self.backend {
327 DataStoreBackend::Heap(map) => map.contains_key(key),
328 DataStoreBackend::Journaled { .. } => {
329 Self::journaled_get_raw(&self.backend, key).is_some()
330 }
331 }
332 }
333
334 #[cfg(test)]
336 pub(in crate::db) fn clear(&mut self) {
337 match &mut self.backend {
338 DataStoreBackend::Heap(map) => map.clear(),
339 DataStoreBackend::Journaled {
340 canonical,
341 live,
342 tombstones,
343 } => {
344 canonical.clear_new();
345 live.clear();
346 tombstones.clear();
347 }
348 }
349 self.entity_cardinality.clear();
350 self.bump_generation();
351 }
352
353 #[must_use]
355 pub(in crate::db) fn len(&self) -> u64 {
356 match &self.backend {
357 DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
358 DataStoreBackend::Journaled { .. } => {
359 let mut count = 0_u64;
360 let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
361 count = count.saturating_add(1);
362 Ok(StoreVisit::Continue)
363 });
364 count
365 }
366 }
367 }
368
369 #[must_use]
371 pub(in crate::db) const fn generation(&self) -> u64 {
372 self.generation
373 }
374
375 #[must_use]
377 pub(in crate::db) fn exact_entity_count(&self, entity: EntityTag) -> Option<u64> {
378 self.entity_cardinality.exact_count(entity)
379 }
380
381 #[must_use]
383 #[cfg(test)]
384 pub(in crate::db) fn is_empty(&self) -> bool {
385 match &self.backend {
386 DataStoreBackend::Heap(map) => map.is_empty(),
387 DataStoreBackend::Journaled { .. } => {
388 let mut empty = true;
389 let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
390 empty = false;
391 Ok(StoreVisit::Stop)
392 });
393 empty
394 }
395 }
396 }
397
398 pub(in crate::db) fn visit_entries<E>(
400 &self,
401 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
402 ) -> Result<(), E> {
403 match &self.backend {
404 DataStoreBackend::Heap(map) => {
405 for (key, row) in map {
406 if visitor(key, row)?.should_stop() {
407 break;
408 }
409 }
410 }
411 DataStoreBackend::Journaled {
412 canonical: _,
413 live: _,
414 tombstones: _,
415 } => Self::visit_journaled_entries_in_bounds(
416 &self.backend,
417 (Bound::Unbounded, Bound::Unbounded),
418 false,
419 visitor,
420 )?,
421 }
422
423 Ok(())
424 }
425
426 pub(in crate::db) fn visit_entries_rev<E>(
428 &self,
429 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
430 ) -> Result<(), E> {
431 match &self.backend {
432 DataStoreBackend::Heap(map) => {
433 for (key, row) in map.iter().rev() {
434 if visitor(key, row)?.should_stop() {
435 break;
436 }
437 }
438 }
439 DataStoreBackend::Journaled {
440 canonical: _,
441 live: _,
442 tombstones: _,
443 } => Self::visit_journaled_entries_in_bounds(
444 &self.backend,
445 (Bound::Unbounded, Bound::Unbounded),
446 true,
447 visitor,
448 )?,
449 }
450
451 Ok(())
452 }
453
454 pub(in crate::db) fn visit_range<E>(
456 &self,
457 key_range: impl RangeBounds<RawDataStoreKey>,
458 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
459 ) -> Result<(), E> {
460 let bounds = Self::owned_range_bounds(&key_range);
461 match &self.backend {
462 DataStoreBackend::Heap(map) => {
463 for (key, row) in map.range((bounds.0.clone(), bounds.1)) {
464 if visitor(key, row)?.should_stop() {
465 break;
466 }
467 }
468 }
469 DataStoreBackend::Journaled {
470 canonical: _,
471 live: _,
472 tombstones: _,
473 } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, false, visitor)?,
474 }
475
476 Ok(())
477 }
478
479 pub(in crate::db) fn visit_range_rev<E>(
481 &self,
482 key_range: impl RangeBounds<RawDataStoreKey>,
483 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
484 ) -> Result<(), E> {
485 let bounds = Self::owned_range_bounds(&key_range);
486 match &self.backend {
487 DataStoreBackend::Heap(map) => {
488 for (key, row) in map.range((bounds.0.clone(), bounds.1)).rev() {
489 if visitor(key, row)?.should_stop() {
490 break;
491 }
492 }
493 }
494 DataStoreBackend::Journaled {
495 canonical: _,
496 live: _,
497 tombstones: _,
498 } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, true, visitor)?,
499 }
500
501 Ok(())
502 }
503
504 pub(in crate::db) fn visit_entity<E>(
506 &self,
507 entity: EntityTag,
508 visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
509 ) -> Result<(), E> {
510 let range = RawDataStoreKeyRange::entity_prefix(entity);
511 self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
512 }
513
514 pub(in crate::db) fn memory_bytes(&self) -> u64 {
516 let mut bytes = 0u64;
518 let _: Result<(), Infallible> = self.visit_entries(|key, row| {
519 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
520 Ok(StoreVisit::Continue)
521 });
522 bytes
523 }
524
525 const fn bump_generation(&mut self) {
526 self.generation = self.generation.saturating_add(1);
527 }
528
529 fn rebuild_entity_cardinality_from_entries(&mut self) {
530 let mut cardinality = EntityCardinality::empty();
531 let _: Result<(), Infallible> = self.visit_entries(|key, _row| {
532 cardinality.apply_present_key(key);
533 Ok(StoreVisit::Continue)
534 });
535 self.entity_cardinality = cardinality;
536 }
537
538 #[cfg(feature = "diagnostics")]
540 pub(in crate::db) fn current_get_call_count() -> u64 {
541 DATA_STORE_GET_CALL_COUNT.with(Cell::get)
542 }
543
544 #[cfg(test)]
545 #[must_use]
546 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
547 match &self.backend {
548 DataStoreBackend::Journaled { canonical: map, .. } => map.len(),
549 DataStoreBackend::Heap(_) => 0,
550 }
551 }
552
553 fn journaled_get_raw(backend: &DataStoreBackend, key: &RawDataStoreKey) -> Option<RawRow> {
554 let DataStoreBackend::Journaled {
555 canonical,
556 live,
557 tombstones,
558 } = backend
559 else {
560 return None;
561 };
562
563 if tombstones.contains(key) {
564 return None;
565 }
566 live.get(key).cloned().or_else(|| canonical.get(key))
567 }
568
569 fn owned_range_bounds(
570 key_range: &impl RangeBounds<RawDataStoreKey>,
571 ) -> (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>) {
572 let lower = match key_range.start_bound() {
573 Bound::Included(key) => Bound::Included(key.clone()),
574 Bound::Excluded(key) => Bound::Excluded(key.clone()),
575 Bound::Unbounded => Bound::Unbounded,
576 };
577 let upper = match key_range.end_bound() {
578 Bound::Included(key) => Bound::Included(key.clone()),
579 Bound::Excluded(key) => Bound::Excluded(key.clone()),
580 Bound::Unbounded => Bound::Unbounded,
581 };
582
583 (lower, upper)
584 }
585
586 fn visit_journaled_entries_in_bounds<E>(
587 backend: &DataStoreBackend,
588 bounds: (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>),
589 reverse: bool,
590 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
591 ) -> Result<(), E> {
592 let DataStoreBackend::Journaled {
593 canonical,
594 live,
595 tombstones,
596 } = backend
597 else {
598 return Ok(());
599 };
600
601 if canonical.is_empty() {
602 if reverse {
603 for (key, row) in live.range(bounds).rev() {
604 if visitor(key, row)?.should_stop() {
605 return Ok(());
606 }
607 }
608 } else {
609 for (key, row) in live.range(bounds) {
610 if visitor(key, row)?.should_stop() {
611 return Ok(());
612 }
613 }
614 }
615 return Ok(());
616 }
617
618 if live.is_empty() && tombstones.is_empty() {
619 if reverse {
620 for entry in canonical.range(bounds).rev() {
621 if visitor(entry.key(), &entry.value())?.should_stop() {
622 return Ok(());
623 }
624 }
625 } else {
626 for entry in canonical.range(bounds) {
627 if visitor(entry.key(), &entry.value())?.should_stop() {
628 return Ok(());
629 }
630 }
631 }
632 return Ok(());
633 }
634
635 match if reverse {
636 Direction::Desc
637 } else {
638 Direction::Asc
639 } {
640 Direction::Asc => visit_ordered_overlay(
641 canonical.range((bounds.0.clone(), bounds.1.clone())),
642 live.range((bounds.0, bounds.1)),
643 Direction::Asc,
644 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
645 |canonical_entry| !tombstones.contains(canonical_entry.key()),
646 |live_entry| !tombstones.contains(live_entry.0),
647 |entry| {
648 let visit = match entry {
649 OrderedOverlayEntry::Canonical(canonical_entry) => {
650 visitor(canonical_entry.key(), &canonical_entry.value())?
651 }
652 OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
653 };
654 Ok(if visit.should_stop() {
655 OrderedOverlayVisit::Stop
656 } else {
657 OrderedOverlayVisit::Continue
658 })
659 },
660 ),
661 Direction::Desc => visit_ordered_overlay(
662 canonical.range((bounds.0.clone(), bounds.1.clone())).rev(),
663 live.range((bounds.0, bounds.1)).rev(),
664 Direction::Desc,
665 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
666 |canonical_entry| !tombstones.contains(canonical_entry.key()),
667 |live_entry| !tombstones.contains(live_entry.0),
668 |entry| {
669 let visit = match entry {
670 OrderedOverlayEntry::Canonical(canonical_entry) => {
671 visitor(canonical_entry.key(), &canonical_entry.value())?
672 }
673 OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
674 };
675 Ok(if visit.should_stop() {
676 OrderedOverlayVisit::Stop
677 } else {
678 OrderedOverlayVisit::Continue
679 })
680 },
681 ),
682 }
683 }
684}
685
686#[derive(Clone, Debug)]
687struct EntityCardinality {
688 counts: HeapBTreeMap<EntityTag, u64>,
689 decodable: bool,
690}
691
692impl EntityCardinality {
693 const fn empty() -> Self {
694 Self {
695 counts: HeapBTreeMap::new(),
696 decodable: true,
697 }
698 }
699
700 fn exact_count(&self, entity: EntityTag) -> Option<u64> {
701 self.decodable
702 .then(|| self.counts.get(&entity).copied().unwrap_or(0))
703 }
704
705 #[cfg(test)]
706 fn clear(&mut self) {
707 self.counts.clear();
708 self.decodable = true;
709 }
710
711 fn apply_insert(&mut self, key: &RawDataStoreKey, previous: Option<&RawRow>) {
712 if previous.is_some() {
713 return;
714 }
715 self.apply_present_key(key);
716 }
717
718 fn apply_remove(&mut self, key: &RawDataStoreKey, previous: Option<&RawRow>) {
719 if previous.is_none() {
720 return;
721 }
722 self.apply_removed_key(key);
723 }
724
725 fn apply_present_key(&mut self, key: &RawDataStoreKey) {
726 if !self.decodable {
727 return;
728 }
729 let Some(entity) = key.entity_tag_prefix() else {
730 self.invalidate();
731 return;
732 };
733
734 let count = self.counts.entry(entity).or_insert(0);
735 *count = count.saturating_add(1);
736 }
737
738 fn apply_removed_key(&mut self, key: &RawDataStoreKey) {
739 if !self.decodable {
740 return;
741 }
742 let Some(entity) = key.entity_tag_prefix() else {
743 self.invalidate();
744 return;
745 };
746
747 if let Some(count) = self.counts.get_mut(&entity) {
748 *count = count.saturating_sub(1);
749 if *count == 0 {
750 self.counts.remove(&entity);
751 }
752 }
753 }
754
755 fn invalidate(&mut self) {
756 self.counts.clear();
757 self.decodable = false;
758 }
759}
760
761#[cfg(test)]
762mod tests;