1use crate::{
7 db::{
8 data::{CanonicalRow, RawDataStoreKey, RawRow},
9 direction::Direction,
10 key_taxonomy::RawDataStoreKeyRange,
11 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
12 },
13 types::EntityTag,
14};
15use ic_memory::stable_structures::{
16 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
17};
18#[cfg(feature = "diagnostics")]
19use std::cell::Cell;
20use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
21use std::convert::Infallible;
22use std::ops::{Bound, RangeBounds};
23
24#[cfg(feature = "diagnostics")]
25thread_local! {
26 static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
27}
28
29#[cfg(feature = "diagnostics")]
30fn record_data_store_get_call() {
31 DATA_STORE_GET_CALL_COUNT.with(|count| {
32 count.set(count.get().saturating_add(1));
33 });
34}
35
36pub struct DataStore {
46 backend: DataStoreBackend,
47}
48
49enum DataStoreBackend {
50 Stable(StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>),
51 Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
52 Journaled {
53 canonical: StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>,
54 live: HeapBTreeMap<RawDataStoreKey, RawRow>,
55 tombstones: BTreeSet<RawDataStoreKey>,
56 },
57}
58
59#[derive(Clone, Copy, Debug, Eq, PartialEq)]
61pub(in crate::db) enum StoreVisit {
62 Continue,
63 Stop,
64}
65
66impl StoreVisit {
67 const fn should_stop(self) -> bool {
68 matches!(self, Self::Stop)
69 }
70}
71
72impl DataStore {
73 #[must_use]
75 pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
76 Self {
77 backend: DataStoreBackend::Stable(StableBTreeMap::init(memory)),
78 }
79 }
80
81 #[must_use]
83 pub const fn init_heap() -> Self {
84 Self {
85 backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
86 }
87 }
88
89 #[must_use]
95 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
96 Self {
97 backend: DataStoreBackend::Journaled {
98 canonical: StableBTreeMap::init(memory),
99 live: HeapBTreeMap::new(),
100 tombstones: BTreeSet::new(),
101 },
102 }
103 }
104
105 pub(in crate::db) fn insert(
107 &mut self,
108 key: RawDataStoreKey,
109 row: CanonicalRow,
110 ) -> Option<RawRow> {
111 let row = row.into_raw_row();
112 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
113 self.get(&key)
114 } else {
115 None
116 };
117 match &mut self.backend {
118 DataStoreBackend::Stable(map) => map.insert(key, row),
119 DataStoreBackend::Heap(map) => map.insert(key, row),
120 DataStoreBackend::Journaled {
121 live, tombstones, ..
122 } => {
123 tombstones.remove(&key);
124 live.insert(key, row);
125 previous_journaled
126 }
127 }
128 }
129
130 #[cfg(test)]
132 pub(in crate::db) fn insert_raw_for_test(
133 &mut self,
134 key: RawDataStoreKey,
135 row: RawRow,
136 ) -> Option<RawRow> {
137 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
138 self.get(&key)
139 } else {
140 None
141 };
142 match &mut self.backend {
143 DataStoreBackend::Stable(map) => map.insert(key, row),
144 DataStoreBackend::Heap(map) => map.insert(key, row),
145 DataStoreBackend::Journaled {
146 live, tombstones, ..
147 } => {
148 tombstones.remove(&key);
149 live.insert(key, row);
150 previous_journaled
151 }
152 }
153 }
154
155 pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
157 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
158 self.get(key)
159 } else {
160 None
161 };
162 match &mut self.backend {
163 DataStoreBackend::Stable(map) => map.remove(key),
164 DataStoreBackend::Heap(map) => map.remove(key),
165 DataStoreBackend::Journaled {
166 live, tombstones, ..
167 } => {
168 live.remove(key);
169 tombstones.insert(key.clone());
170 previous_journaled
171 }
172 }
173 }
174
175 pub(in crate::db) fn reset_journaled_live_projection(
178 &mut self,
179 ) -> Result<(), crate::error::InternalError> {
180 let DataStoreBackend::Journaled {
181 live, tombstones, ..
182 } = &mut self.backend
183 else {
184 return Err(crate::error::InternalError::store_invariant());
185 };
186
187 live.clear();
188 tombstones.clear();
189
190 Ok(())
191 }
192
193 pub(in crate::db) fn apply_recovered_journal_put(
195 &mut self,
196 key: RawDataStoreKey,
197 row: RawRow,
198 ) -> Result<Option<RawRow>, crate::error::InternalError> {
199 let DataStoreBackend::Journaled {
200 canonical,
201 live,
202 tombstones,
203 } = &mut self.backend
204 else {
205 return Err(crate::error::InternalError::store_invariant());
206 };
207
208 let previous = if tombstones.contains(&key) {
209 None
210 } else {
211 live.get(&key).cloned().or_else(|| canonical.get(&key))
212 };
213 tombstones.remove(&key);
214 live.insert(key, row);
215
216 Ok(previous)
217 }
218
219 pub(in crate::db) fn apply_recovered_journal_delete(
221 &mut self,
222 key: &RawDataStoreKey,
223 ) -> Result<Option<RawRow>, crate::error::InternalError> {
224 let DataStoreBackend::Journaled {
225 canonical,
226 live,
227 tombstones,
228 } = &mut self.backend
229 else {
230 return Err(crate::error::InternalError::store_invariant());
231 };
232
233 let previous = if tombstones.contains(key) {
234 None
235 } else {
236 live.get(key).cloned().or_else(|| canonical.get(key))
237 };
238 live.remove(key);
239 tombstones.insert(key.clone());
240
241 Ok(previous)
242 }
243
244 pub(in crate::db) fn fold_recovered_journal_put(
246 &mut self,
247 key: RawDataStoreKey,
248 row: RawRow,
249 ) -> Result<Option<RawRow>, crate::error::InternalError> {
250 let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
251 return Err(crate::error::InternalError::store_invariant());
252 };
253
254 Ok(canonical.insert(key, row))
255 }
256
257 pub(in crate::db) fn fold_recovered_journal_delete(
259 &mut self,
260 key: &RawDataStoreKey,
261 ) -> Result<Option<RawRow>, crate::error::InternalError> {
262 let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
263 return Err(crate::error::InternalError::store_invariant());
264 };
265
266 Ok(canonical.remove(key))
267 }
268
269 pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
271 #[cfg(feature = "diagnostics")]
272 record_data_store_get_call();
273
274 match &self.backend {
275 DataStoreBackend::Stable(map) => map.get(key),
276 DataStoreBackend::Heap(map) => map.get(key).cloned(),
277 DataStoreBackend::Journaled { .. } => Self::journaled_get_raw(&self.backend, key),
278 }
279 }
280
281 #[must_use]
283 pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
284 match &self.backend {
285 DataStoreBackend::Stable(map) => map.contains_key(key),
286 DataStoreBackend::Heap(map) => map.contains_key(key),
287 DataStoreBackend::Journaled { .. } => {
288 Self::journaled_get_raw(&self.backend, key).is_some()
289 }
290 }
291 }
292
293 #[cfg(test)]
295 pub(in crate::db) fn clear(&mut self) {
296 match &mut self.backend {
297 DataStoreBackend::Stable(map) => map.clear_new(),
298 DataStoreBackend::Heap(map) => map.clear(),
299 DataStoreBackend::Journaled {
300 canonical,
301 live,
302 tombstones,
303 } => {
304 live.clear();
305 tombstones.clear();
306 for entry in canonical.iter() {
307 tombstones.insert(entry.key().clone());
308 }
309 }
310 }
311 }
312
313 #[must_use]
315 pub(in crate::db) fn len(&self) -> u64 {
316 match &self.backend {
317 DataStoreBackend::Stable(map) => map.len(),
318 DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
319 DataStoreBackend::Journaled { .. } => {
320 let mut count = 0_u64;
321 let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
322 count = count.saturating_add(1);
323 Ok(StoreVisit::Continue)
324 });
325 count
326 }
327 }
328 }
329
330 #[must_use]
332 #[cfg(test)]
333 pub(in crate::db) fn is_empty(&self) -> bool {
334 match &self.backend {
335 DataStoreBackend::Stable(map) => map.is_empty(),
336 DataStoreBackend::Heap(map) => map.is_empty(),
337 DataStoreBackend::Journaled { .. } => {
338 let mut empty = true;
339 let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
340 empty = false;
341 Ok(StoreVisit::Stop)
342 });
343 empty
344 }
345 }
346 }
347
348 pub(in crate::db) fn visit_entries<E>(
350 &self,
351 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
352 ) -> Result<(), E> {
353 match &self.backend {
354 DataStoreBackend::Stable(map) => {
355 for entry in map.iter() {
356 if visitor(entry.key(), &entry.value())?.should_stop() {
357 break;
358 }
359 }
360 }
361 DataStoreBackend::Heap(map) => {
362 for (key, row) in map {
363 if visitor(key, row)?.should_stop() {
364 break;
365 }
366 }
367 }
368 DataStoreBackend::Journaled {
369 canonical: _,
370 live: _,
371 tombstones: _,
372 } => Self::visit_journaled_entries_in_bounds(
373 &self.backend,
374 (Bound::Unbounded, Bound::Unbounded),
375 false,
376 visitor,
377 )?,
378 }
379
380 Ok(())
381 }
382
383 pub(in crate::db) fn visit_entries_rev<E>(
385 &self,
386 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
387 ) -> Result<(), E> {
388 match &self.backend {
389 DataStoreBackend::Stable(map) => {
390 for entry in map.iter().rev() {
391 if visitor(entry.key(), &entry.value())?.should_stop() {
392 break;
393 }
394 }
395 }
396 DataStoreBackend::Heap(map) => {
397 for (key, row) in map.iter().rev() {
398 if visitor(key, row)?.should_stop() {
399 break;
400 }
401 }
402 }
403 DataStoreBackend::Journaled {
404 canonical: _,
405 live: _,
406 tombstones: _,
407 } => Self::visit_journaled_entries_in_bounds(
408 &self.backend,
409 (Bound::Unbounded, Bound::Unbounded),
410 true,
411 visitor,
412 )?,
413 }
414
415 Ok(())
416 }
417
418 pub(in crate::db) fn visit_range<E>(
420 &self,
421 key_range: impl RangeBounds<RawDataStoreKey>,
422 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
423 ) -> Result<(), E> {
424 let bounds = Self::owned_range_bounds(&key_range);
425 match &self.backend {
426 DataStoreBackend::Stable(map) => {
427 for entry in map.range((bounds.0.clone(), bounds.1)) {
428 if visitor(entry.key(), &entry.value())?.should_stop() {
429 break;
430 }
431 }
432 }
433 DataStoreBackend::Heap(map) => {
434 for (key, row) in map.range((bounds.0.clone(), bounds.1)) {
435 if visitor(key, row)?.should_stop() {
436 break;
437 }
438 }
439 }
440 DataStoreBackend::Journaled {
441 canonical: _,
442 live: _,
443 tombstones: _,
444 } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, false, visitor)?,
445 }
446
447 Ok(())
448 }
449
450 pub(in crate::db) fn visit_range_rev<E>(
452 &self,
453 key_range: impl RangeBounds<RawDataStoreKey>,
454 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
455 ) -> Result<(), E> {
456 let bounds = Self::owned_range_bounds(&key_range);
457 match &self.backend {
458 DataStoreBackend::Stable(map) => {
459 for entry in map.range((bounds.0.clone(), bounds.1)).rev() {
460 if visitor(entry.key(), &entry.value())?.should_stop() {
461 break;
462 }
463 }
464 }
465 DataStoreBackend::Heap(map) => {
466 for (key, row) in map.range((bounds.0.clone(), bounds.1)).rev() {
467 if visitor(key, row)?.should_stop() {
468 break;
469 }
470 }
471 }
472 DataStoreBackend::Journaled {
473 canonical: _,
474 live: _,
475 tombstones: _,
476 } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, true, visitor)?,
477 }
478
479 Ok(())
480 }
481
482 pub(in crate::db) fn visit_entity<E>(
484 &self,
485 entity: EntityTag,
486 visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
487 ) -> Result<(), E> {
488 let range = RawDataStoreKeyRange::entity_prefix(entity);
489 self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
490 }
491
492 pub(in crate::db) fn memory_bytes(&self) -> u64 {
494 let mut bytes = 0u64;
496 let _: Result<(), Infallible> = self.visit_entries(|key, row| {
497 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
498 Ok(StoreVisit::Continue)
499 });
500 bytes
501 }
502
503 #[cfg(feature = "diagnostics")]
505 pub(in crate::db) fn current_get_call_count() -> u64 {
506 DATA_STORE_GET_CALL_COUNT.with(Cell::get)
507 }
508
509 #[cfg(test)]
510 #[must_use]
511 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
512 match &self.backend {
513 DataStoreBackend::Stable(map) | DataStoreBackend::Journaled { canonical: map, .. } => {
514 map.len()
515 }
516 DataStoreBackend::Heap(_) => 0,
517 }
518 }
519
520 fn journaled_get_raw(backend: &DataStoreBackend, key: &RawDataStoreKey) -> Option<RawRow> {
521 let DataStoreBackend::Journaled {
522 canonical,
523 live,
524 tombstones,
525 } = backend
526 else {
527 return None;
528 };
529
530 if tombstones.contains(key) {
531 return None;
532 }
533 live.get(key).cloned().or_else(|| canonical.get(key))
534 }
535
536 fn owned_range_bounds(
537 key_range: &impl RangeBounds<RawDataStoreKey>,
538 ) -> (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>) {
539 let lower = match key_range.start_bound() {
540 Bound::Included(key) => Bound::Included(key.clone()),
541 Bound::Excluded(key) => Bound::Excluded(key.clone()),
542 Bound::Unbounded => Bound::Unbounded,
543 };
544 let upper = match key_range.end_bound() {
545 Bound::Included(key) => Bound::Included(key.clone()),
546 Bound::Excluded(key) => Bound::Excluded(key.clone()),
547 Bound::Unbounded => Bound::Unbounded,
548 };
549
550 (lower, upper)
551 }
552
553 fn visit_journaled_entries_in_bounds<E>(
554 backend: &DataStoreBackend,
555 bounds: (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>),
556 reverse: bool,
557 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
558 ) -> Result<(), E> {
559 let DataStoreBackend::Journaled {
560 canonical,
561 live,
562 tombstones,
563 } = backend
564 else {
565 return Ok(());
566 };
567
568 if canonical.is_empty() {
569 if reverse {
570 for (key, row) in live.range(bounds).rev() {
571 if visitor(key, row)?.should_stop() {
572 return Ok(());
573 }
574 }
575 } else {
576 for (key, row) in live.range(bounds) {
577 if visitor(key, row)?.should_stop() {
578 return Ok(());
579 }
580 }
581 }
582 return Ok(());
583 }
584
585 if live.is_empty() && tombstones.is_empty() {
586 if reverse {
587 for entry in canonical.range(bounds).rev() {
588 if visitor(entry.key(), &entry.value())?.should_stop() {
589 return Ok(());
590 }
591 }
592 } else {
593 for entry in canonical.range(bounds) {
594 if visitor(entry.key(), &entry.value())?.should_stop() {
595 return Ok(());
596 }
597 }
598 }
599 return Ok(());
600 }
601
602 match if reverse {
603 Direction::Desc
604 } else {
605 Direction::Asc
606 } {
607 Direction::Asc => visit_ordered_overlay(
608 canonical.range((bounds.0.clone(), bounds.1.clone())),
609 live.range((bounds.0, bounds.1)),
610 Direction::Asc,
611 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
612 |canonical_entry| !tombstones.contains(canonical_entry.key()),
613 |live_entry| !tombstones.contains(live_entry.0),
614 |entry| {
615 let visit = match entry {
616 OrderedOverlayEntry::Canonical(canonical_entry) => {
617 visitor(canonical_entry.key(), &canonical_entry.value())?
618 }
619 OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
620 };
621 Ok(if visit.should_stop() {
622 OrderedOverlayVisit::Stop
623 } else {
624 OrderedOverlayVisit::Continue
625 })
626 },
627 ),
628 Direction::Desc => visit_ordered_overlay(
629 canonical.range((bounds.0.clone(), bounds.1.clone())).rev(),
630 live.range((bounds.0, bounds.1)).rev(),
631 Direction::Desc,
632 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
633 |canonical_entry| !tombstones.contains(canonical_entry.key()),
634 |live_entry| !tombstones.contains(live_entry.0),
635 |entry| {
636 let visit = match entry {
637 OrderedOverlayEntry::Canonical(canonical_entry) => {
638 visitor(canonical_entry.key(), &canonical_entry.value())?
639 }
640 OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
641 };
642 Ok(if visit.should_stop() {
643 OrderedOverlayVisit::Stop
644 } else {
645 OrderedOverlayVisit::Continue
646 })
647 },
648 ),
649 }
650 }
651}
652
653#[cfg(test)]
654mod tests;