1use crate::db::{
7 direction::Direction,
8 index::{IndexEntryValue, key::RawIndexStoreKey},
9 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
10};
11
12use candid::CandidType;
13use ic_memory::stable_structures::{
14 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
15};
16use serde::Deserialize;
17#[cfg(test)]
18use std::cell::Cell;
19use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
20use std::ops::Bound;
21
22#[cfg(test)]
23thread_local! {
24 static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
25}
26
27#[cfg(test)]
28fn record_journaled_snapshot_call() {
29 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
30 count.set(count.get().saturating_add(1));
31 });
32}
33
34#[cfg(test)]
35fn reset_journaled_snapshot_call_count_for_tests() {
36 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
37}
38
39#[cfg(test)]
40fn journaled_snapshot_call_count_for_tests() -> u64 {
41 JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
42}
43
44#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
52pub enum IndexState {
53 Building,
54 #[default]
55 Ready,
56 Dropping,
57}
58
59impl IndexState {
60 #[must_use]
62 pub const fn as_str(self) -> &'static str {
63 match self {
64 Self::Building => "building",
65 Self::Ready => "ready",
66 Self::Dropping => "dropping",
67 }
68 }
69}
70
71pub struct IndexStore {
80 pub(super) backend: IndexStoreBackend,
81 generation: u64,
82 state: IndexState,
83}
84
85pub(super) enum IndexStoreBackend {
86 Stable(StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>),
87 Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
88 Journaled {
89 canonical:
90 StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
91 live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
92 tombstones: BTreeSet<RawIndexStoreKey>,
93 },
94}
95
96#[derive(Clone, Copy, Debug, Eq, PartialEq)]
98pub(in crate::db) enum IndexStoreVisit {
99 Continue,
100 Stop,
101}
102
103impl IndexStoreVisit {
104 const fn should_stop(self) -> bool {
105 matches!(self, Self::Stop)
106 }
107}
108
109impl IndexStore {
110 #[must_use]
111 pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
112 Self {
113 backend: IndexStoreBackend::Stable(StableBTreeMap::init(memory)),
114 generation: 0,
115 state: IndexState::Ready,
118 }
119 }
120
121 #[must_use]
123 pub const fn init_heap() -> Self {
124 Self {
125 backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
126 generation: 0,
127 state: IndexState::Ready,
128 }
129 }
130
131 #[must_use]
136 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
137 Self {
138 backend: IndexStoreBackend::Journaled {
139 canonical: StableBTreeMap::init(memory),
140 live: HeapBTreeMap::new(),
141 tombstones: BTreeSet::new(),
142 },
143 generation: 0,
144 state: IndexState::Ready,
145 }
146 }
147
148 pub(in crate::db) fn visit_entries<E>(
151 &self,
152 mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
153 ) -> Result<(), E> {
154 match &self.backend {
155 IndexStoreBackend::Stable(map) => {
156 for entry in map.iter() {
157 if visitor(entry.key(), &entry.value())?.should_stop() {
158 return Ok(());
159 }
160 }
161 }
162 IndexStoreBackend::Heap(map) => {
163 for (key, value) in map {
164 if visitor(key, value)?.should_stop() {
165 return Ok(());
166 }
167 }
168 }
169 IndexStoreBackend::Journaled {
170 canonical: _,
171 live: _,
172 tombstones: _,
173 } => self.visit_journaled_entries_in_range(
174 (&Bound::Unbounded, &Bound::Unbounded),
175 Direction::Asc,
176 |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
177 )?,
178 }
179
180 Ok(())
181 }
182
183 pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
184 match &self.backend {
185 IndexStoreBackend::Stable(map) => map.get(key),
186 IndexStoreBackend::Heap(map) => map.get(key).cloned(),
187 IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
188 }
189 }
190
191 pub fn len(&self) -> u64 {
192 match &self.backend {
193 IndexStoreBackend::Stable(map) => map.len(),
194 IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
195 IndexStoreBackend::Journaled { .. } => {
196 let mut count = 0_u64;
197 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
198 count = count.saturating_add(1);
199 Ok(IndexStoreVisit::Continue)
200 });
201 count
202 }
203 }
204 }
205
206 pub fn is_empty(&self) -> bool {
207 match &self.backend {
208 IndexStoreBackend::Stable(map) => map.is_empty(),
209 IndexStoreBackend::Heap(map) => map.is_empty(),
210 IndexStoreBackend::Journaled { .. } => {
211 let mut empty = true;
212 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
213 empty = false;
214 Ok(IndexStoreVisit::Stop)
215 });
216 empty
217 }
218 }
219 }
220
221 #[must_use]
222 pub(in crate::db) const fn generation(&self) -> u64 {
223 self.generation
224 }
225
226 #[must_use]
228 pub(in crate::db) const fn state(&self) -> IndexState {
229 self.state
230 }
231
232 pub(in crate::db) const fn mark_building(&mut self) {
235 self.state = IndexState::Building;
236 }
237
238 pub(in crate::db) const fn mark_ready(&mut self) {
240 self.state = IndexState::Ready;
241 }
242
243 pub(in crate::db) const fn mark_dropping(&mut self) {
245 self.state = IndexState::Dropping;
246 }
247
248 pub(crate) fn insert(
249 &mut self,
250 key: RawIndexStoreKey,
251 entry: IndexEntryValue,
252 ) -> Option<IndexEntryValue> {
253 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
254 self.get(&key)
255 } else {
256 None
257 };
258 let previous = match &mut self.backend {
259 IndexStoreBackend::Stable(map) => map.insert(key, entry),
260 IndexStoreBackend::Heap(map) => map.insert(key, entry),
261 IndexStoreBackend::Journaled {
262 live, tombstones, ..
263 } => {
264 tombstones.remove(&key);
265 live.insert(key, entry);
266 previous_journaled
267 }
268 };
269 self.bump_generation();
270 previous
271 }
272
273 pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
274 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
275 self.get(key)
276 } else {
277 None
278 };
279 let previous = match &mut self.backend {
280 IndexStoreBackend::Stable(map) => map.remove(key),
281 IndexStoreBackend::Heap(map) => map.remove(key),
282 IndexStoreBackend::Journaled {
283 live, tombstones, ..
284 } => {
285 live.remove(key);
286 tombstones.insert(key.clone());
287 previous_journaled
288 }
289 };
290 self.bump_generation();
291 previous
292 }
293
294 pub fn clear(&mut self) {
295 match &mut self.backend {
296 IndexStoreBackend::Stable(map) => map.clear_new(),
297 IndexStoreBackend::Heap(map) => map.clear(),
298 IndexStoreBackend::Journaled {
299 canonical,
300 live,
301 tombstones,
302 } => {
303 live.clear();
304 tombstones.clear();
305 for entry in canonical.iter() {
306 tombstones.insert(entry.key().clone());
307 }
308 }
309 }
310 self.bump_generation();
311 }
312
313 pub(in crate::db) fn fold_journaled_materialized_view(
316 &mut self,
317 ) -> Result<(), crate::error::InternalError> {
318 let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
319 let IndexStoreBackend::Journaled {
320 canonical,
321 live,
322 tombstones,
323 } = &mut self.backend
324 else {
325 return Err(crate::error::InternalError::store_invariant());
326 };
327
328 canonical.clear_new();
329 for (key, value) in entries {
330 canonical.insert(key, value);
331 }
332 live.clear();
333 tombstones.clear();
334 self.bump_generation();
335
336 Ok(())
337 }
338
339 pub fn memory_bytes(&self) -> u64 {
341 let mut bytes = 0u64;
342 let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
343 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
344 Ok(IndexStoreVisit::Continue)
345 });
346 bytes
347 }
348
349 const fn bump_generation(&mut self) {
350 self.generation = self.generation.saturating_add(1);
351 }
352
353 #[cfg(test)]
354 #[must_use]
355 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
356 match &self.backend {
357 IndexStoreBackend::Stable(map)
358 | IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
359 IndexStoreBackend::Heap(_) => 0,
360 }
361 }
362
363 fn journaled_get(
364 backend: &IndexStoreBackend,
365 key: &RawIndexStoreKey,
366 ) -> Option<IndexEntryValue> {
367 let IndexStoreBackend::Journaled {
368 canonical,
369 live,
370 tombstones,
371 } = backend
372 else {
373 return None;
374 };
375
376 if tombstones.contains(key) {
377 return None;
378 }
379 live.get(key).cloned().or_else(|| canonical.get(key))
380 }
381
382 pub(super) fn journaled_entries_snapshot_for_fold(
383 backend: &IndexStoreBackend,
384 ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
385 #[cfg(test)]
386 record_journaled_snapshot_call();
387
388 let IndexStoreBackend::Journaled {
389 canonical,
390 live,
391 tombstones,
392 } = backend
393 else {
394 return HeapBTreeMap::new();
395 };
396
397 let mut entries = HeapBTreeMap::new();
398 for entry in canonical.iter() {
399 let key = entry.key().clone();
400 if !tombstones.contains(&key) {
401 entries.insert(key, entry.value());
402 }
403 }
404 for (key, value) in live {
405 if !tombstones.contains(key) {
406 entries.insert(key.clone(), value.clone());
407 }
408 }
409
410 entries
411 }
412
413 pub(super) fn visit_journaled_entries_in_range<E>(
414 &self,
415 bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
416 direction: Direction,
417 mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
418 ) -> Result<(), E> {
419 let IndexStoreBackend::Journaled {
420 canonical,
421 live,
422 tombstones,
423 } = &self.backend
424 else {
425 return Ok(());
426 };
427
428 let lower = bounds.0.clone();
429 let upper = bounds.1.clone();
430 match direction {
431 Direction::Asc if canonical.is_empty() => {
432 for (key, value) in live.range((lower, upper)) {
433 if visit(key, value)? {
434 return Ok(());
435 }
436 }
437 }
438 Direction::Desc if canonical.is_empty() => {
439 for (key, value) in live.range((lower, upper)).rev() {
440 if visit(key, value)? {
441 return Ok(());
442 }
443 }
444 }
445 Direction::Asc if live.is_empty() && tombstones.is_empty() => {
446 for entry in canonical.range((lower, upper)) {
447 if visit(entry.key(), &entry.value())? {
448 return Ok(());
449 }
450 }
451 }
452 Direction::Desc if live.is_empty() && tombstones.is_empty() => {
453 for entry in canonical.range((lower, upper)).rev() {
454 if visit(entry.key(), &entry.value())? {
455 return Ok(());
456 }
457 }
458 }
459 Direction::Asc => {
460 visit_ordered_overlay(
461 canonical.range((lower.clone(), upper.clone())),
462 live.range((lower, upper)),
463 direction,
464 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
465 |canonical_entry| !tombstones.contains(canonical_entry.key()),
466 |live_entry| !tombstones.contains(live_entry.0),
467 |entry| {
468 let should_stop = match entry {
469 OrderedOverlayEntry::Canonical(canonical_entry) => {
470 visit(canonical_entry.key(), &canonical_entry.value())?
471 }
472 OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
473 };
474 Ok(if should_stop {
475 OrderedOverlayVisit::Stop
476 } else {
477 OrderedOverlayVisit::Continue
478 })
479 },
480 )?;
481 }
482 Direction::Desc => {
483 visit_ordered_overlay(
484 canonical.range((lower.clone(), upper.clone())).rev(),
485 live.range((lower, upper)).rev(),
486 direction,
487 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
488 |canonical_entry| !tombstones.contains(canonical_entry.key()),
489 |live_entry| !tombstones.contains(live_entry.0),
490 |entry| {
491 let should_stop = match entry {
492 OrderedOverlayEntry::Canonical(canonical_entry) => {
493 visit(canonical_entry.key(), &canonical_entry.value())?
494 }
495 OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
496 };
497 Ok(if should_stop {
498 OrderedOverlayVisit::Stop
499 } else {
500 OrderedOverlayVisit::Continue
501 })
502 },
503 )?;
504 }
505 }
506
507 Ok(())
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514 use crate::{db::direction::Direction, testing::test_memory, traits::Storable};
515 use std::{borrow::Cow, convert::Infallible};
516
517 fn raw_key(value: u8) -> RawIndexStoreKey {
518 <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
519 }
520
521 #[test]
522 fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
523 let mut store = IndexStore::init_journaled(test_memory(93));
524 for value in [1_u8, 3, 5] {
525 store.insert(raw_key(value), IndexEntryValue::presence());
526 }
527 store
528 .fold_journaled_materialized_view()
529 .expect("canonical index seed should fold");
530
531 store.insert(raw_key(0), IndexEntryValue::presence());
532 store.insert(raw_key(4), IndexEntryValue::presence());
533 store.insert(raw_key(5), IndexEntryValue::presence());
534 store.remove(&raw_key(1));
535
536 let lower = Bound::Included(raw_key(0));
537 let upper = Bound::Included(raw_key(5));
538
539 reset_journaled_snapshot_call_count_for_tests();
540 let mut asc = Vec::new();
541 store
542 .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
543 asc.push(key.as_bytes()[0]);
544 Ok::<_, Infallible>(asc.len() == 2)
545 })
546 .expect("asc journaled index range traversal should succeed");
547 assert_eq!(asc, vec![0, 3]);
548 assert_eq!(
549 journaled_snapshot_call_count_for_tests(),
550 0,
551 "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
552 );
553
554 reset_journaled_snapshot_call_count_for_tests();
555 let mut desc = Vec::new();
556 store
557 .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
558 desc.push(key.as_bytes()[0]);
559 Ok::<_, Infallible>(desc.len() == 2)
560 })
561 .expect("desc journaled index range traversal should succeed");
562 assert_eq!(desc, vec![5, 4]);
563 assert_eq!(
564 journaled_snapshot_call_count_for_tests(),
565 0,
566 "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
567 );
568 }
569}