1use crate::db::{
7 direction::Direction,
8 index::{IndexEntryValue, key::RawIndexStoreKey},
9 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
10};
11
12use candid::CandidType;
13use ic_memory::stable_structures::{
14 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
15};
16use serde::Deserialize;
17#[cfg(test)]
18use std::cell::Cell;
19use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
20use std::ops::Bound;
21
22#[cfg(test)]
23thread_local! {
24 static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
25}
26
27#[cfg(test)]
28fn record_journaled_snapshot_call() {
29 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
30 count.set(count.get().saturating_add(1));
31 });
32}
33
34#[cfg(test)]
35fn reset_journaled_snapshot_call_count_for_tests() {
36 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
37}
38
39#[cfg(test)]
40fn journaled_snapshot_call_count_for_tests() -> u64 {
41 JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
42}
43
44#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
52pub enum IndexState {
53 Building,
54 #[default]
55 Ready,
56 Dropping,
57}
58
59impl IndexState {
60 #[must_use]
62 pub const fn as_str(self) -> &'static str {
63 match self {
64 Self::Building => "building",
65 Self::Ready => "ready",
66 Self::Dropping => "dropping",
67 }
68 }
69}
70
71pub struct IndexStore {
80 pub(super) backend: IndexStoreBackend,
81 generation: u64,
82 state: IndexState,
83}
84
85pub(super) enum IndexStoreBackend {
86 Stable(StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>),
87 Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
88 Journaled {
89 canonical:
90 StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
91 live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
92 tombstones: BTreeSet<RawIndexStoreKey>,
93 },
94}
95
96#[derive(Clone, Copy, Debug, Eq, PartialEq)]
98pub(in crate::db) enum IndexStoreVisit {
99 Continue,
100 Stop,
101}
102
103impl IndexStoreVisit {
104 const fn should_stop(self) -> bool {
105 matches!(self, Self::Stop)
106 }
107}
108
109impl IndexStore {
110 #[must_use]
111 pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
112 Self {
113 backend: IndexStoreBackend::Stable(StableBTreeMap::init(memory)),
114 generation: 0,
115 state: IndexState::Ready,
118 }
119 }
120
121 #[must_use]
123 pub const fn init_heap() -> Self {
124 Self {
125 backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
126 generation: 0,
127 state: IndexState::Ready,
128 }
129 }
130
131 #[must_use]
136 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
137 Self {
138 backend: IndexStoreBackend::Journaled {
139 canonical: StableBTreeMap::init(memory),
140 live: HeapBTreeMap::new(),
141 tombstones: BTreeSet::new(),
142 },
143 generation: 0,
144 state: IndexState::Ready,
145 }
146 }
147
148 pub(in crate::db) fn visit_entries<E>(
151 &self,
152 mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
153 ) -> Result<(), E> {
154 match &self.backend {
155 IndexStoreBackend::Stable(map) => {
156 for entry in map.iter() {
157 if visitor(entry.key(), &entry.value())?.should_stop() {
158 return Ok(());
159 }
160 }
161 }
162 IndexStoreBackend::Heap(map) => {
163 for (key, value) in map {
164 if visitor(key, value)?.should_stop() {
165 return Ok(());
166 }
167 }
168 }
169 IndexStoreBackend::Journaled {
170 canonical: _,
171 live: _,
172 tombstones: _,
173 } => self.visit_journaled_entries_in_range(
174 (&Bound::Unbounded, &Bound::Unbounded),
175 Direction::Asc,
176 |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
177 )?,
178 }
179
180 Ok(())
181 }
182
183 pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
184 match &self.backend {
185 IndexStoreBackend::Stable(map) => map.get(key),
186 IndexStoreBackend::Heap(map) => map.get(key).cloned(),
187 IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
188 }
189 }
190
191 pub fn len(&self) -> u64 {
192 match &self.backend {
193 IndexStoreBackend::Stable(map) => map.len(),
194 IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
195 IndexStoreBackend::Journaled { .. } => {
196 let mut count = 0_u64;
197 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
198 count = count.saturating_add(1);
199 Ok(IndexStoreVisit::Continue)
200 });
201 count
202 }
203 }
204 }
205
206 pub fn is_empty(&self) -> bool {
207 match &self.backend {
208 IndexStoreBackend::Stable(map) => map.is_empty(),
209 IndexStoreBackend::Heap(map) => map.is_empty(),
210 IndexStoreBackend::Journaled { .. } => {
211 let mut empty = true;
212 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
213 empty = false;
214 Ok(IndexStoreVisit::Stop)
215 });
216 empty
217 }
218 }
219 }
220
221 #[must_use]
222 pub(in crate::db) const fn generation(&self) -> u64 {
223 self.generation
224 }
225
226 #[must_use]
228 pub(in crate::db) const fn state(&self) -> IndexState {
229 self.state
230 }
231
232 pub(in crate::db) const fn mark_building(&mut self) {
235 self.state = IndexState::Building;
236 }
237
238 pub(in crate::db) const fn mark_ready(&mut self) {
240 self.state = IndexState::Ready;
241 }
242
243 pub(in crate::db) const fn mark_dropping(&mut self) {
245 self.state = IndexState::Dropping;
246 }
247
248 pub(crate) fn insert(
249 &mut self,
250 key: RawIndexStoreKey,
251 entry: IndexEntryValue,
252 ) -> Option<IndexEntryValue> {
253 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
254 self.get(&key)
255 } else {
256 None
257 };
258 let previous = match &mut self.backend {
259 IndexStoreBackend::Stable(map) => map.insert(key, entry),
260 IndexStoreBackend::Heap(map) => map.insert(key, entry),
261 IndexStoreBackend::Journaled {
262 live, tombstones, ..
263 } => {
264 tombstones.remove(&key);
265 live.insert(key, entry);
266 previous_journaled
267 }
268 };
269 self.bump_generation();
270 previous
271 }
272
273 pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
274 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
275 self.get(key)
276 } else {
277 None
278 };
279 let previous = match &mut self.backend {
280 IndexStoreBackend::Stable(map) => map.remove(key),
281 IndexStoreBackend::Heap(map) => map.remove(key),
282 IndexStoreBackend::Journaled {
283 live, tombstones, ..
284 } => {
285 live.remove(key);
286 tombstones.insert(key.clone());
287 previous_journaled
288 }
289 };
290 self.bump_generation();
291 previous
292 }
293
294 pub fn clear(&mut self) {
295 match &mut self.backend {
296 IndexStoreBackend::Stable(map) => map.clear_new(),
297 IndexStoreBackend::Heap(map) => map.clear(),
298 IndexStoreBackend::Journaled {
299 canonical,
300 live,
301 tombstones,
302 } => {
303 live.clear();
304 tombstones.clear();
305 for entry in canonical.iter() {
306 tombstones.insert(entry.key().clone());
307 }
308 }
309 }
310 self.bump_generation();
311 }
312
313 pub(in crate::db) fn fold_journaled_materialized_view(
316 &mut self,
317 ) -> Result<(), crate::error::InternalError> {
318 let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
319 let IndexStoreBackend::Journaled {
320 canonical,
321 live,
322 tombstones,
323 } = &mut self.backend
324 else {
325 return Err(crate::error::InternalError::store_invariant(
326 "journal index fold requires a journaled index store",
327 ));
328 };
329
330 canonical.clear_new();
331 for (key, value) in entries {
332 canonical.insert(key, value);
333 }
334 live.clear();
335 tombstones.clear();
336 self.bump_generation();
337
338 Ok(())
339 }
340
341 pub fn memory_bytes(&self) -> u64 {
343 let mut bytes = 0u64;
344 let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
345 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
346 Ok(IndexStoreVisit::Continue)
347 });
348 bytes
349 }
350
351 const fn bump_generation(&mut self) {
352 self.generation = self.generation.saturating_add(1);
353 }
354
355 #[cfg(test)]
356 #[must_use]
357 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
358 match &self.backend {
359 IndexStoreBackend::Stable(map)
360 | IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
361 IndexStoreBackend::Heap(_) => 0,
362 }
363 }
364
365 fn journaled_get(
366 backend: &IndexStoreBackend,
367 key: &RawIndexStoreKey,
368 ) -> Option<IndexEntryValue> {
369 let IndexStoreBackend::Journaled {
370 canonical,
371 live,
372 tombstones,
373 } = backend
374 else {
375 return None;
376 };
377
378 if tombstones.contains(key) {
379 return None;
380 }
381 live.get(key).cloned().or_else(|| canonical.get(key))
382 }
383
384 pub(super) fn journaled_entries_snapshot_for_fold(
385 backend: &IndexStoreBackend,
386 ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
387 #[cfg(test)]
388 record_journaled_snapshot_call();
389
390 let IndexStoreBackend::Journaled {
391 canonical,
392 live,
393 tombstones,
394 } = backend
395 else {
396 return HeapBTreeMap::new();
397 };
398
399 let mut entries = HeapBTreeMap::new();
400 for entry in canonical.iter() {
401 let key = entry.key().clone();
402 if !tombstones.contains(&key) {
403 entries.insert(key, entry.value());
404 }
405 }
406 for (key, value) in live {
407 if !tombstones.contains(key) {
408 entries.insert(key.clone(), value.clone());
409 }
410 }
411
412 entries
413 }
414
415 pub(super) fn visit_journaled_entries_in_range<E>(
416 &self,
417 bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
418 direction: Direction,
419 mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
420 ) -> Result<(), E> {
421 let IndexStoreBackend::Journaled {
422 canonical,
423 live,
424 tombstones,
425 } = &self.backend
426 else {
427 return Ok(());
428 };
429
430 let lower = bounds.0.clone();
431 let upper = bounds.1.clone();
432 match direction {
433 Direction::Asc if canonical.is_empty() => {
434 for (key, value) in live.range((lower, upper)) {
435 if visit(key, value)? {
436 return Ok(());
437 }
438 }
439 }
440 Direction::Desc if canonical.is_empty() => {
441 for (key, value) in live.range((lower, upper)).rev() {
442 if visit(key, value)? {
443 return Ok(());
444 }
445 }
446 }
447 Direction::Asc if live.is_empty() && tombstones.is_empty() => {
448 for entry in canonical.range((lower, upper)) {
449 if visit(entry.key(), &entry.value())? {
450 return Ok(());
451 }
452 }
453 }
454 Direction::Desc if live.is_empty() && tombstones.is_empty() => {
455 for entry in canonical.range((lower, upper)).rev() {
456 if visit(entry.key(), &entry.value())? {
457 return Ok(());
458 }
459 }
460 }
461 Direction::Asc => {
462 visit_ordered_overlay(
463 canonical.range((lower.clone(), upper.clone())),
464 live.range((lower, upper)),
465 direction,
466 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
467 |canonical_entry| !tombstones.contains(canonical_entry.key()),
468 |live_entry| !tombstones.contains(live_entry.0),
469 |entry| {
470 let should_stop = match entry {
471 OrderedOverlayEntry::Canonical(canonical_entry) => {
472 visit(canonical_entry.key(), &canonical_entry.value())?
473 }
474 OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
475 };
476 Ok(if should_stop {
477 OrderedOverlayVisit::Stop
478 } else {
479 OrderedOverlayVisit::Continue
480 })
481 },
482 )?;
483 }
484 Direction::Desc => {
485 visit_ordered_overlay(
486 canonical.range((lower.clone(), upper.clone())).rev(),
487 live.range((lower, upper)).rev(),
488 direction,
489 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
490 |canonical_entry| !tombstones.contains(canonical_entry.key()),
491 |live_entry| !tombstones.contains(live_entry.0),
492 |entry| {
493 let should_stop = match entry {
494 OrderedOverlayEntry::Canonical(canonical_entry) => {
495 visit(canonical_entry.key(), &canonical_entry.value())?
496 }
497 OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
498 };
499 Ok(if should_stop {
500 OrderedOverlayVisit::Stop
501 } else {
502 OrderedOverlayVisit::Continue
503 })
504 },
505 )?;
506 }
507 }
508
509 Ok(())
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516 use crate::{db::direction::Direction, testing::test_memory, traits::Storable};
517 use std::{borrow::Cow, convert::Infallible};
518
519 fn raw_key(value: u8) -> RawIndexStoreKey {
520 <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
521 }
522
523 #[test]
524 fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
525 let mut store = IndexStore::init_journaled(test_memory(93));
526 for value in [1_u8, 3, 5] {
527 store.insert(raw_key(value), IndexEntryValue::presence());
528 }
529 store
530 .fold_journaled_materialized_view()
531 .expect("canonical index seed should fold");
532
533 store.insert(raw_key(0), IndexEntryValue::presence());
534 store.insert(raw_key(4), IndexEntryValue::presence());
535 store.insert(raw_key(5), IndexEntryValue::presence());
536 store.remove(&raw_key(1));
537
538 let lower = Bound::Included(raw_key(0));
539 let upper = Bound::Included(raw_key(5));
540
541 reset_journaled_snapshot_call_count_for_tests();
542 let mut asc = Vec::new();
543 store
544 .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
545 asc.push(key.as_bytes()[0]);
546 Ok::<_, Infallible>(asc.len() == 2)
547 })
548 .expect("asc journaled index range traversal should succeed");
549 assert_eq!(asc, vec![0, 3]);
550 assert_eq!(
551 journaled_snapshot_call_count_for_tests(),
552 0,
553 "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
554 );
555
556 reset_journaled_snapshot_call_count_for_tests();
557 let mut desc = Vec::new();
558 store
559 .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
560 desc.push(key.as_bytes()[0]);
561 Ok::<_, Infallible>(desc.len() == 2)
562 })
563 .expect("desc journaled index range traversal should succeed");
564 assert_eq!(desc, vec![5, 4]);
565 assert_eq!(
566 journaled_snapshot_call_count_for_tests(),
567 0,
568 "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
569 );
570 }
571}