1use crate::{
7 db::{
8 data::{CanonicalRow, RawDataStoreKey, RawRow},
9 direction::Direction,
10 key_taxonomy::RawDataStoreKeyRange,
11 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
12 },
13 types::EntityTag,
14};
15use ic_memory::stable_structures::{
16 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
17};
18#[cfg(feature = "diagnostics")]
19use std::cell::Cell;
20use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
21use std::convert::Infallible;
22use std::ops::{Bound, RangeBounds};
23
24#[cfg(feature = "diagnostics")]
25thread_local! {
26 static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
27}
28
29#[cfg(feature = "diagnostics")]
30fn record_data_store_get_call() {
31 DATA_STORE_GET_CALL_COUNT.with(|count| {
32 count.set(count.get().saturating_add(1));
33 });
34}
35
36pub struct DataStore {
46 backend: DataStoreBackend,
47}
48
49enum DataStoreBackend {
50 Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
51 Journaled {
52 canonical: StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>,
53 live: HeapBTreeMap<RawDataStoreKey, RawRow>,
54 tombstones: BTreeSet<RawDataStoreKey>,
55 },
56}
57
58#[derive(Clone, Copy, Debug, Eq, PartialEq)]
60pub(in crate::db) enum StoreVisit {
61 Continue,
62 Stop,
63}
64
65impl StoreVisit {
66 const fn should_stop(self) -> bool {
67 matches!(self, Self::Stop)
68 }
69}
70
71impl DataStore {
72 #[must_use]
74 pub const fn init_heap() -> Self {
75 Self {
76 backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
77 }
78 }
79
80 #[must_use]
86 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
87 Self {
88 backend: DataStoreBackend::Journaled {
89 canonical: StableBTreeMap::init(memory),
90 live: HeapBTreeMap::new(),
91 tombstones: BTreeSet::new(),
92 },
93 }
94 }
95
96 pub(in crate::db) fn insert(
98 &mut self,
99 key: RawDataStoreKey,
100 row: CanonicalRow,
101 ) -> Option<RawRow> {
102 let row = row.into_raw_row();
103 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
104 self.get(&key)
105 } else {
106 None
107 };
108 match &mut self.backend {
109 DataStoreBackend::Heap(map) => map.insert(key, row),
110 DataStoreBackend::Journaled {
111 live, tombstones, ..
112 } => {
113 tombstones.remove(&key);
114 live.insert(key, row);
115 previous_journaled
116 }
117 }
118 }
119
120 #[cfg(test)]
122 pub(in crate::db) fn insert_raw_for_test(
123 &mut self,
124 key: RawDataStoreKey,
125 row: RawRow,
126 ) -> Option<RawRow> {
127 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
128 self.get(&key)
129 } else {
130 None
131 };
132 match &mut self.backend {
133 DataStoreBackend::Heap(map) => map.insert(key, row),
134 DataStoreBackend::Journaled {
135 live, tombstones, ..
136 } => {
137 tombstones.remove(&key);
138 live.insert(key, row);
139 previous_journaled
140 }
141 }
142 }
143
144 pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
146 let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
147 self.get(key)
148 } else {
149 None
150 };
151 match &mut self.backend {
152 DataStoreBackend::Heap(map) => map.remove(key),
153 DataStoreBackend::Journaled {
154 live, tombstones, ..
155 } => {
156 live.remove(key);
157 tombstones.insert(key.clone());
158 previous_journaled
159 }
160 }
161 }
162
163 pub(in crate::db) fn reset_journaled_live_projection(
166 &mut self,
167 ) -> Result<(), crate::error::InternalError> {
168 let DataStoreBackend::Journaled {
169 live, tombstones, ..
170 } = &mut self.backend
171 else {
172 return Err(crate::error::InternalError::store_invariant());
173 };
174
175 live.clear();
176 tombstones.clear();
177
178 Ok(())
179 }
180
181 pub(in crate::db) fn apply_recovered_journal_put(
183 &mut self,
184 key: RawDataStoreKey,
185 row: RawRow,
186 ) -> Result<Option<RawRow>, crate::error::InternalError> {
187 let DataStoreBackend::Journaled {
188 canonical,
189 live,
190 tombstones,
191 } = &mut self.backend
192 else {
193 return Err(crate::error::InternalError::store_invariant());
194 };
195
196 let previous = if tombstones.contains(&key) {
197 None
198 } else {
199 live.get(&key).cloned().or_else(|| canonical.get(&key))
200 };
201 tombstones.remove(&key);
202 live.insert(key, row);
203
204 Ok(previous)
205 }
206
207 pub(in crate::db) fn apply_recovered_journal_delete(
209 &mut self,
210 key: &RawDataStoreKey,
211 ) -> Result<Option<RawRow>, crate::error::InternalError> {
212 let DataStoreBackend::Journaled {
213 canonical,
214 live,
215 tombstones,
216 } = &mut self.backend
217 else {
218 return Err(crate::error::InternalError::store_invariant());
219 };
220
221 let previous = if tombstones.contains(key) {
222 None
223 } else {
224 live.get(key).cloned().or_else(|| canonical.get(key))
225 };
226 live.remove(key);
227 tombstones.insert(key.clone());
228
229 Ok(previous)
230 }
231
232 pub(in crate::db) fn fold_recovered_journal_put(
234 &mut self,
235 key: RawDataStoreKey,
236 row: RawRow,
237 ) -> Result<Option<RawRow>, crate::error::InternalError> {
238 let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
239 return Err(crate::error::InternalError::store_invariant());
240 };
241
242 Ok(canonical.insert(key, row))
243 }
244
245 pub(in crate::db) fn fold_recovered_journal_delete(
247 &mut self,
248 key: &RawDataStoreKey,
249 ) -> Result<Option<RawRow>, crate::error::InternalError> {
250 let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
251 return Err(crate::error::InternalError::store_invariant());
252 };
253
254 Ok(canonical.remove(key))
255 }
256
257 pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
259 #[cfg(feature = "diagnostics")]
260 record_data_store_get_call();
261
262 match &self.backend {
263 DataStoreBackend::Heap(map) => map.get(key).cloned(),
264 DataStoreBackend::Journaled { .. } => Self::journaled_get_raw(&self.backend, key),
265 }
266 }
267
268 #[must_use]
270 pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
271 match &self.backend {
272 DataStoreBackend::Heap(map) => map.contains_key(key),
273 DataStoreBackend::Journaled { .. } => {
274 Self::journaled_get_raw(&self.backend, key).is_some()
275 }
276 }
277 }
278
279 #[cfg(test)]
281 pub(in crate::db) fn clear(&mut self) {
282 match &mut self.backend {
283 DataStoreBackend::Heap(map) => map.clear(),
284 DataStoreBackend::Journaled {
285 canonical,
286 live,
287 tombstones,
288 } => {
289 canonical.clear_new();
290 live.clear();
291 tombstones.clear();
292 }
293 }
294 }
295
296 #[must_use]
298 pub(in crate::db) fn len(&self) -> u64 {
299 match &self.backend {
300 DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
301 DataStoreBackend::Journaled { .. } => {
302 let mut count = 0_u64;
303 let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
304 count = count.saturating_add(1);
305 Ok(StoreVisit::Continue)
306 });
307 count
308 }
309 }
310 }
311
312 #[must_use]
314 #[cfg(test)]
315 pub(in crate::db) fn is_empty(&self) -> bool {
316 match &self.backend {
317 DataStoreBackend::Heap(map) => map.is_empty(),
318 DataStoreBackend::Journaled { .. } => {
319 let mut empty = true;
320 let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
321 empty = false;
322 Ok(StoreVisit::Stop)
323 });
324 empty
325 }
326 }
327 }
328
329 pub(in crate::db) fn visit_entries<E>(
331 &self,
332 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
333 ) -> Result<(), E> {
334 match &self.backend {
335 DataStoreBackend::Heap(map) => {
336 for (key, row) in map {
337 if visitor(key, row)?.should_stop() {
338 break;
339 }
340 }
341 }
342 DataStoreBackend::Journaled {
343 canonical: _,
344 live: _,
345 tombstones: _,
346 } => Self::visit_journaled_entries_in_bounds(
347 &self.backend,
348 (Bound::Unbounded, Bound::Unbounded),
349 false,
350 visitor,
351 )?,
352 }
353
354 Ok(())
355 }
356
357 pub(in crate::db) fn visit_entries_rev<E>(
359 &self,
360 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
361 ) -> Result<(), E> {
362 match &self.backend {
363 DataStoreBackend::Heap(map) => {
364 for (key, row) in map.iter().rev() {
365 if visitor(key, row)?.should_stop() {
366 break;
367 }
368 }
369 }
370 DataStoreBackend::Journaled {
371 canonical: _,
372 live: _,
373 tombstones: _,
374 } => Self::visit_journaled_entries_in_bounds(
375 &self.backend,
376 (Bound::Unbounded, Bound::Unbounded),
377 true,
378 visitor,
379 )?,
380 }
381
382 Ok(())
383 }
384
385 pub(in crate::db) fn visit_range<E>(
387 &self,
388 key_range: impl RangeBounds<RawDataStoreKey>,
389 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
390 ) -> Result<(), E> {
391 let bounds = Self::owned_range_bounds(&key_range);
392 match &self.backend {
393 DataStoreBackend::Heap(map) => {
394 for (key, row) in map.range((bounds.0.clone(), bounds.1)) {
395 if visitor(key, row)?.should_stop() {
396 break;
397 }
398 }
399 }
400 DataStoreBackend::Journaled {
401 canonical: _,
402 live: _,
403 tombstones: _,
404 } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, false, visitor)?,
405 }
406
407 Ok(())
408 }
409
410 pub(in crate::db) fn visit_range_rev<E>(
412 &self,
413 key_range: impl RangeBounds<RawDataStoreKey>,
414 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
415 ) -> Result<(), E> {
416 let bounds = Self::owned_range_bounds(&key_range);
417 match &self.backend {
418 DataStoreBackend::Heap(map) => {
419 for (key, row) in map.range((bounds.0.clone(), bounds.1)).rev() {
420 if visitor(key, row)?.should_stop() {
421 break;
422 }
423 }
424 }
425 DataStoreBackend::Journaled {
426 canonical: _,
427 live: _,
428 tombstones: _,
429 } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, true, visitor)?,
430 }
431
432 Ok(())
433 }
434
435 pub(in crate::db) fn visit_entity<E>(
437 &self,
438 entity: EntityTag,
439 visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
440 ) -> Result<(), E> {
441 let range = RawDataStoreKeyRange::entity_prefix(entity);
442 self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
443 }
444
445 pub(in crate::db) fn memory_bytes(&self) -> u64 {
447 let mut bytes = 0u64;
449 let _: Result<(), Infallible> = self.visit_entries(|key, row| {
450 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
451 Ok(StoreVisit::Continue)
452 });
453 bytes
454 }
455
456 #[cfg(feature = "diagnostics")]
458 pub(in crate::db) fn current_get_call_count() -> u64 {
459 DATA_STORE_GET_CALL_COUNT.with(Cell::get)
460 }
461
462 #[cfg(test)]
463 #[must_use]
464 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
465 match &self.backend {
466 DataStoreBackend::Journaled { canonical: map, .. } => map.len(),
467 DataStoreBackend::Heap(_) => 0,
468 }
469 }
470
471 fn journaled_get_raw(backend: &DataStoreBackend, key: &RawDataStoreKey) -> Option<RawRow> {
472 let DataStoreBackend::Journaled {
473 canonical,
474 live,
475 tombstones,
476 } = backend
477 else {
478 return None;
479 };
480
481 if tombstones.contains(key) {
482 return None;
483 }
484 live.get(key).cloned().or_else(|| canonical.get(key))
485 }
486
487 fn owned_range_bounds(
488 key_range: &impl RangeBounds<RawDataStoreKey>,
489 ) -> (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>) {
490 let lower = match key_range.start_bound() {
491 Bound::Included(key) => Bound::Included(key.clone()),
492 Bound::Excluded(key) => Bound::Excluded(key.clone()),
493 Bound::Unbounded => Bound::Unbounded,
494 };
495 let upper = match key_range.end_bound() {
496 Bound::Included(key) => Bound::Included(key.clone()),
497 Bound::Excluded(key) => Bound::Excluded(key.clone()),
498 Bound::Unbounded => Bound::Unbounded,
499 };
500
501 (lower, upper)
502 }
503
504 fn visit_journaled_entries_in_bounds<E>(
505 backend: &DataStoreBackend,
506 bounds: (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>),
507 reverse: bool,
508 mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
509 ) -> Result<(), E> {
510 let DataStoreBackend::Journaled {
511 canonical,
512 live,
513 tombstones,
514 } = backend
515 else {
516 return Ok(());
517 };
518
519 if canonical.is_empty() {
520 if reverse {
521 for (key, row) in live.range(bounds).rev() {
522 if visitor(key, row)?.should_stop() {
523 return Ok(());
524 }
525 }
526 } else {
527 for (key, row) in live.range(bounds) {
528 if visitor(key, row)?.should_stop() {
529 return Ok(());
530 }
531 }
532 }
533 return Ok(());
534 }
535
536 if live.is_empty() && tombstones.is_empty() {
537 if reverse {
538 for entry in canonical.range(bounds).rev() {
539 if visitor(entry.key(), &entry.value())?.should_stop() {
540 return Ok(());
541 }
542 }
543 } else {
544 for entry in canonical.range(bounds) {
545 if visitor(entry.key(), &entry.value())?.should_stop() {
546 return Ok(());
547 }
548 }
549 }
550 return Ok(());
551 }
552
553 match if reverse {
554 Direction::Desc
555 } else {
556 Direction::Asc
557 } {
558 Direction::Asc => visit_ordered_overlay(
559 canonical.range((bounds.0.clone(), bounds.1.clone())),
560 live.range((bounds.0, bounds.1)),
561 Direction::Asc,
562 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
563 |canonical_entry| !tombstones.contains(canonical_entry.key()),
564 |live_entry| !tombstones.contains(live_entry.0),
565 |entry| {
566 let visit = match entry {
567 OrderedOverlayEntry::Canonical(canonical_entry) => {
568 visitor(canonical_entry.key(), &canonical_entry.value())?
569 }
570 OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
571 };
572 Ok(if visit.should_stop() {
573 OrderedOverlayVisit::Stop
574 } else {
575 OrderedOverlayVisit::Continue
576 })
577 },
578 ),
579 Direction::Desc => visit_ordered_overlay(
580 canonical.range((bounds.0.clone(), bounds.1.clone())).rev(),
581 live.range((bounds.0, bounds.1)).rev(),
582 Direction::Desc,
583 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
584 |canonical_entry| !tombstones.contains(canonical_entry.key()),
585 |live_entry| !tombstones.contains(live_entry.0),
586 |entry| {
587 let visit = match entry {
588 OrderedOverlayEntry::Canonical(canonical_entry) => {
589 visitor(canonical_entry.key(), &canonical_entry.value())?
590 }
591 OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
592 };
593 Ok(if visit.should_stop() {
594 OrderedOverlayVisit::Stop
595 } else {
596 OrderedOverlayVisit::Continue
597 })
598 },
599 ),
600 }
601 }
602}
603
604#[cfg(test)]
605mod tests;