1use crate::{
7 db::{
8 data::{CanonicalRow, RawDataStoreKey, RawRow},
9 direction::Direction,
10 key_taxonomy::RawDataStoreKeyRange,
11 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
12 },
13 types::EntityTag,
14};
15use ic_memory::stable_structures::{
16 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
17};
18#[cfg(feature = "diagnostics")]
19use std::cell::Cell;
20use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
21use std::convert::Infallible;
22use std::ops::{Bound, RangeBounds};
23
24#[cfg(feature = "diagnostics")]
25thread_local! {
26 static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
27}
28
29#[cfg(feature = "diagnostics")]
30fn record_data_store_get_call() {
31 DATA_STORE_GET_CALL_COUNT.with(|count| {
32 count.set(count.get().saturating_add(1));
33 });
34}
35
36pub struct DataStore {
46 backend: DataStoreBackend,
47 generation: u64,
48}
49
50enum DataStoreBackend {
51 Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
52 Journaled {
53 canonical: StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>,
54 live: HeapBTreeMap<RawDataStoreKey, RawRow>,
55 tombstones: BTreeSet<RawDataStoreKey>,
56 },
57}
58
59#[derive(Clone, Copy, Debug, Eq, PartialEq)]
61pub(in crate::db) enum StoreVisit {
62 Continue,
63 Stop,
64}
65
66impl StoreVisit {
67 const fn should_stop(self) -> bool {
68 matches!(self, Self::Stop)
69 }
70}
71
72impl DataStore {
73 #[must_use]
75 pub const fn init_heap() -> Self {
76 Self {
77 backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
78 generation: 0,
79 }
80 }
81
82 #[must_use]
88 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
89 Self {
90 backend: DataStoreBackend::Journaled {
91 canonical: StableBTreeMap::init(memory),
92 live: HeapBTreeMap::new(),
93 tombstones: BTreeSet::new(),
94 },
95 generation: 0,
96 }
97 }
98
99 pub(in crate::db) fn insert(
101 &mut self,
102 key: RawDataStoreKey,
103 row: CanonicalRow,
104 ) -> Option<RawRow> {
105 let row = row.into_raw_row();
106 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
107 self.get(&key)
108 } else {
109 None
110 };
111 let previous = match &mut self.backend {
112 DataStoreBackend::Heap(map) => map.insert(key, row),
113 DataStoreBackend::Journaled {
114 live, tombstones, ..
115 } => {
116 tombstones.remove(&key);
117 live.insert(key, row);
118 previous_journaled
119 }
120 };
121 self.bump_generation();
122 previous
123 }
124
125 #[cfg(test)]
127 pub(in crate::db) fn insert_raw_for_test(
128 &mut self,
129 key: RawDataStoreKey,
130 row: RawRow,
131 ) -> Option<RawRow> {
132 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
133 self.get(&key)
134 } else {
135 None
136 };
137 let previous = match &mut self.backend {
138 DataStoreBackend::Heap(map) => map.insert(key, row),
139 DataStoreBackend::Journaled {
140 live, tombstones, ..
141 } => {
142 tombstones.remove(&key);
143 live.insert(key, row);
144 previous_journaled
145 }
146 };
147 self.bump_generation();
148 previous
149 }
150
151 pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
153 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
154 self.get(key)
155 } else {
156 None
157 };
158 let previous = match &mut self.backend {
159 DataStoreBackend::Heap(map) => map.remove(key),
160 DataStoreBackend::Journaled {
161 live, tombstones, ..
162 } => {
163 live.remove(key);
164 tombstones.insert(key.clone());
165 previous_journaled
166 }
167 };
168 self.bump_generation();
169 previous
170 }
171
172 pub(in crate::db) fn reset_journaled_live_projection(
175 &mut self,
176 ) -> Result<(), crate::error::InternalError> {
177 let DataStoreBackend::Journaled {
178 live, tombstones, ..
179 } = &mut self.backend
180 else {
181 return Err(crate::error::InternalError::store_invariant());
182 };
183
184 live.clear();
185 tombstones.clear();
186 self.bump_generation();
187
188 Ok(())
189 }
190
191 pub(in crate::db) fn apply_recovered_journal_put(
193 &mut self,
194 key: RawDataStoreKey,
195 row: RawRow,
196 ) -> Result<Option<RawRow>, crate::error::InternalError> {
197 let DataStoreBackend::Journaled {
198 canonical,
199 live,
200 tombstones,
201 } = &mut self.backend
202 else {
203 return Err(crate::error::InternalError::store_invariant());
204 };
205
206 let previous = if tombstones.contains(&key) {
207 None
208 } else {
209 live.get(&key).cloned().or_else(|| canonical.get(&key))
210 };
211 tombstones.remove(&key);
212 live.insert(key, row);
213 self.bump_generation();
214
215 Ok(previous)
216 }
217
218 pub(in crate::db) fn apply_recovered_journal_delete(
220 &mut self,
221 key: &RawDataStoreKey,
222 ) -> Result<Option<RawRow>, crate::error::InternalError> {
223 let DataStoreBackend::Journaled {
224 canonical,
225 live,
226 tombstones,
227 } = &mut self.backend
228 else {
229 return Err(crate::error::InternalError::store_invariant());
230 };
231
232 let previous = if tombstones.contains(key) {
233 None
234 } else {
235 live.get(key).cloned().or_else(|| canonical.get(key))
236 };
237 live.remove(key);
238 tombstones.insert(key.clone());
239 self.bump_generation();
240
241 Ok(previous)
242 }
243
244 pub(in crate::db) fn fold_recovered_journal_put(
246 &mut self,
247 key: RawDataStoreKey,
248 row: RawRow,
249 ) -> Result<Option<RawRow>, crate::error::InternalError> {
250 let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
251 return Err(crate::error::InternalError::store_invariant());
252 };
253
254 let previous = canonical.insert(key, row);
255 self.bump_generation();
256
257 Ok(previous)
258 }
259
260 pub(in crate::db) fn fold_recovered_journal_delete(
262 &mut self,
263 key: &RawDataStoreKey,
264 ) -> Result<Option<RawRow>, crate::error::InternalError> {
265 let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
266 return Err(crate::error::InternalError::store_invariant());
267 };
268
269 let previous = canonical.remove(key);
270 self.bump_generation();
271
272 Ok(previous)
273 }
274
275 pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
277 #[cfg(feature = "diagnostics")]
278 record_data_store_get_call();
279
280 match &self.backend {
281 DataStoreBackend::Heap(map) => map.get(key).cloned(),
282 DataStoreBackend::Journaled { .. } => Self::journaled_get_raw(&self.backend, key),
283 }
284 }
285
286 #[must_use]
288 pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
289 match &self.backend {
290 DataStoreBackend::Heap(map) => map.contains_key(key),
291 DataStoreBackend::Journaled { .. } => {
292 Self::journaled_get_raw(&self.backend, key).is_some()
293 }
294 }
295 }
296
297 #[cfg(test)]
299 pub(in crate::db) fn clear(&mut self) {
300 match &mut self.backend {
301 DataStoreBackend::Heap(map) => map.clear(),
302 DataStoreBackend::Journaled {
303 canonical,
304 live,
305 tombstones,
306 } => {
307 canonical.clear_new();
308 live.clear();
309 tombstones.clear();
310 }
311 }
312 self.bump_generation();
313 }
314
315 #[must_use]
317 pub(in crate::db) fn len(&self) -> u64 {
318 match &self.backend {
319 DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
320 DataStoreBackend::Journaled { .. } => {
321 let mut count = 0_u64;
322 let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
323 count = count.saturating_add(1);
324 Ok(StoreVisit::Continue)
325 });
326 count
327 }
328 }
329 }
330
331 #[must_use]
333 pub(in crate::db) const fn generation(&self) -> u64 {
334 self.generation
335 }
336
337 #[must_use]
339 #[cfg(test)]
340 pub(in crate::db) fn is_empty(&self) -> bool {
341 match &self.backend {
342 DataStoreBackend::Heap(map) => map.is_empty(),
343 DataStoreBackend::Journaled { .. } => {
344 let mut empty = true;
345 let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
346 empty = false;
347 Ok(StoreVisit::Stop)
348 });
349 empty
350 }
351 }
352 }
353
354 pub(in crate::db) fn visit_entries<E>(
356 &self,
357 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
358 ) -> Result<(), E> {
359 match &self.backend {
360 DataStoreBackend::Heap(map) => {
361 for (key, row) in map {
362 if visitor(key, row)?.should_stop() {
363 break;
364 }
365 }
366 }
367 DataStoreBackend::Journaled {
368 canonical: _,
369 live: _,
370 tombstones: _,
371 } => Self::visit_journaled_entries_in_bounds(
372 &self.backend,
373 (Bound::Unbounded, Bound::Unbounded),
374 false,
375 visitor,
376 )?,
377 }
378
379 Ok(())
380 }
381
382 pub(in crate::db) fn visit_entries_rev<E>(
384 &self,
385 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
386 ) -> Result<(), E> {
387 match &self.backend {
388 DataStoreBackend::Heap(map) => {
389 for (key, row) in map.iter().rev() {
390 if visitor(key, row)?.should_stop() {
391 break;
392 }
393 }
394 }
395 DataStoreBackend::Journaled {
396 canonical: _,
397 live: _,
398 tombstones: _,
399 } => Self::visit_journaled_entries_in_bounds(
400 &self.backend,
401 (Bound::Unbounded, Bound::Unbounded),
402 true,
403 visitor,
404 )?,
405 }
406
407 Ok(())
408 }
409
410 pub(in crate::db) fn visit_range<E>(
412 &self,
413 key_range: impl RangeBounds<RawDataStoreKey>,
414 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
415 ) -> Result<(), E> {
416 let bounds = Self::owned_range_bounds(&key_range);
417 match &self.backend {
418 DataStoreBackend::Heap(map) => {
419 for (key, row) in map.range((bounds.0.clone(), bounds.1)) {
420 if visitor(key, row)?.should_stop() {
421 break;
422 }
423 }
424 }
425 DataStoreBackend::Journaled {
426 canonical: _,
427 live: _,
428 tombstones: _,
429 } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, false, visitor)?,
430 }
431
432 Ok(())
433 }
434
435 pub(in crate::db) fn visit_range_rev<E>(
437 &self,
438 key_range: impl RangeBounds<RawDataStoreKey>,
439 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
440 ) -> Result<(), E> {
441 let bounds = Self::owned_range_bounds(&key_range);
442 match &self.backend {
443 DataStoreBackend::Heap(map) => {
444 for (key, row) in map.range((bounds.0.clone(), bounds.1)).rev() {
445 if visitor(key, row)?.should_stop() {
446 break;
447 }
448 }
449 }
450 DataStoreBackend::Journaled {
451 canonical: _,
452 live: _,
453 tombstones: _,
454 } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, true, visitor)?,
455 }
456
457 Ok(())
458 }
459
460 pub(in crate::db) fn visit_entity<E>(
462 &self,
463 entity: EntityTag,
464 visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
465 ) -> Result<(), E> {
466 let range = RawDataStoreKeyRange::entity_prefix(entity);
467 self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
468 }
469
470 pub(in crate::db) fn memory_bytes(&self) -> u64 {
472 let mut bytes = 0u64;
474 let _: Result<(), Infallible> = self.visit_entries(|key, row| {
475 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
476 Ok(StoreVisit::Continue)
477 });
478 bytes
479 }
480
481 const fn bump_generation(&mut self) {
482 self.generation = self.generation.saturating_add(1);
483 }
484
485 #[cfg(feature = "diagnostics")]
487 pub(in crate::db) fn current_get_call_count() -> u64 {
488 DATA_STORE_GET_CALL_COUNT.with(Cell::get)
489 }
490
491 #[cfg(test)]
492 #[must_use]
493 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
494 match &self.backend {
495 DataStoreBackend::Journaled { canonical: map, .. } => map.len(),
496 DataStoreBackend::Heap(_) => 0,
497 }
498 }
499
500 fn journaled_get_raw(backend: &DataStoreBackend, key: &RawDataStoreKey) -> Option<RawRow> {
501 let DataStoreBackend::Journaled {
502 canonical,
503 live,
504 tombstones,
505 } = backend
506 else {
507 return None;
508 };
509
510 if tombstones.contains(key) {
511 return None;
512 }
513 live.get(key).cloned().or_else(|| canonical.get(key))
514 }
515
516 fn owned_range_bounds(
517 key_range: &impl RangeBounds<RawDataStoreKey>,
518 ) -> (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>) {
519 let lower = match key_range.start_bound() {
520 Bound::Included(key) => Bound::Included(key.clone()),
521 Bound::Excluded(key) => Bound::Excluded(key.clone()),
522 Bound::Unbounded => Bound::Unbounded,
523 };
524 let upper = match key_range.end_bound() {
525 Bound::Included(key) => Bound::Included(key.clone()),
526 Bound::Excluded(key) => Bound::Excluded(key.clone()),
527 Bound::Unbounded => Bound::Unbounded,
528 };
529
530 (lower, upper)
531 }
532
533 fn visit_journaled_entries_in_bounds<E>(
534 backend: &DataStoreBackend,
535 bounds: (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>),
536 reverse: bool,
537 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
538 ) -> Result<(), E> {
539 let DataStoreBackend::Journaled {
540 canonical,
541 live,
542 tombstones,
543 } = backend
544 else {
545 return Ok(());
546 };
547
548 if canonical.is_empty() {
549 if reverse {
550 for (key, row) in live.range(bounds).rev() {
551 if visitor(key, row)?.should_stop() {
552 return Ok(());
553 }
554 }
555 } else {
556 for (key, row) in live.range(bounds) {
557 if visitor(key, row)?.should_stop() {
558 return Ok(());
559 }
560 }
561 }
562 return Ok(());
563 }
564
565 if live.is_empty() && tombstones.is_empty() {
566 if reverse {
567 for entry in canonical.range(bounds).rev() {
568 if visitor(entry.key(), &entry.value())?.should_stop() {
569 return Ok(());
570 }
571 }
572 } else {
573 for entry in canonical.range(bounds) {
574 if visitor(entry.key(), &entry.value())?.should_stop() {
575 return Ok(());
576 }
577 }
578 }
579 return Ok(());
580 }
581
582 match if reverse {
583 Direction::Desc
584 } else {
585 Direction::Asc
586 } {
587 Direction::Asc => visit_ordered_overlay(
588 canonical.range((bounds.0.clone(), bounds.1.clone())),
589 live.range((bounds.0, bounds.1)),
590 Direction::Asc,
591 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
592 |canonical_entry| !tombstones.contains(canonical_entry.key()),
593 |live_entry| !tombstones.contains(live_entry.0),
594 |entry| {
595 let visit = match entry {
596 OrderedOverlayEntry::Canonical(canonical_entry) => {
597 visitor(canonical_entry.key(), &canonical_entry.value())?
598 }
599 OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
600 };
601 Ok(if visit.should_stop() {
602 OrderedOverlayVisit::Stop
603 } else {
604 OrderedOverlayVisit::Continue
605 })
606 },
607 ),
608 Direction::Desc => visit_ordered_overlay(
609 canonical.range((bounds.0.clone(), bounds.1.clone())).rev(),
610 live.range((bounds.0, bounds.1)).rev(),
611 Direction::Desc,
612 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
613 |canonical_entry| !tombstones.contains(canonical_entry.key()),
614 |live_entry| !tombstones.contains(live_entry.0),
615 |entry| {
616 let visit = match entry {
617 OrderedOverlayEntry::Canonical(canonical_entry) => {
618 visitor(canonical_entry.key(), &canonical_entry.value())?
619 }
620 OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
621 };
622 Ok(if visit.should_stop() {
623 OrderedOverlayVisit::Stop
624 } else {
625 OrderedOverlayVisit::Continue
626 })
627 },
628 ),
629 }
630 }
631}
632
633#[cfg(test)]
634mod tests;