1use crate::db::{
7 direction::Direction,
8 index::{IndexEntryValue, key::RawIndexStoreKey},
9 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
10};
11
12use candid::CandidType;
13use ic_memory::stable_structures::{
14 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
15};
16use serde::Deserialize;
17#[cfg(any(test, feature = "diagnostics"))]
18use std::cell::Cell;
19use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
20use std::ops::Bound;
21
22#[cfg(test)]
23thread_local! {
24 static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
25}
26
27#[cfg(feature = "diagnostics")]
28thread_local! {
29 static INDEX_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
30 static INDEX_STORE_ENTRY_READ_COUNT: Cell<u64> = const { Cell::new(0) };
31}
32
33#[cfg(feature = "diagnostics")]
34fn record_index_store_get_call() {
35 INDEX_STORE_GET_CALL_COUNT.with(|count| {
36 count.set(count.get().saturating_add(1));
37 });
38}
39
40#[cfg(feature = "diagnostics")]
41fn record_index_store_entry_read() {
42 INDEX_STORE_ENTRY_READ_COUNT.with(|count| {
43 count.set(count.get().saturating_add(1));
44 });
45}
46
47fn visit_index_store_entry<E>(
48 key: &RawIndexStoreKey,
49 value: &IndexEntryValue,
50 visit: &mut impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
51) -> Result<bool, E> {
52 #[cfg(feature = "diagnostics")]
53 record_index_store_entry_read();
54
55 visit(key, value)
56}
57
58#[cfg(test)]
59fn record_journaled_snapshot_call() {
60 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
61 count.set(count.get().saturating_add(1));
62 });
63}
64
65#[cfg(test)]
66fn reset_journaled_snapshot_call_count_for_tests() {
67 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
68}
69
70#[cfg(test)]
71fn journaled_snapshot_call_count_for_tests() -> u64 {
72 JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
73}
74
75#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
83pub enum IndexState {
84 Building,
85 #[default]
86 Ready,
87 Dropping,
88}
89
90impl IndexState {
91 #[must_use]
93 pub const fn as_str(self) -> &'static str {
94 match self {
95 Self::Building => "building",
96 Self::Ready => "ready",
97 Self::Dropping => "dropping",
98 }
99 }
100}
101
102pub struct IndexStore {
111 pub(super) backend: IndexStoreBackend,
112 generation: u64,
113 state: IndexState,
114}
115
116pub(super) enum IndexStoreBackend {
117 Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
118 Journaled {
119 canonical:
120 StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
121 live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
122 tombstones: BTreeSet<RawIndexStoreKey>,
123 },
124}
125
126#[derive(Clone, Copy, Debug, Eq, PartialEq)]
128pub(in crate::db) enum IndexStoreVisit {
129 Continue,
130 Stop,
131}
132
133impl IndexStoreVisit {
134 const fn should_stop(self) -> bool {
135 matches!(self, Self::Stop)
136 }
137}
138
139impl IndexStore {
140 #[must_use]
142 pub const fn init_heap() -> Self {
143 Self {
144 backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
145 generation: 0,
146 state: IndexState::Ready,
147 }
148 }
149
150 #[must_use]
155 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
156 Self {
157 backend: IndexStoreBackend::Journaled {
158 canonical: StableBTreeMap::init(memory),
159 live: HeapBTreeMap::new(),
160 tombstones: BTreeSet::new(),
161 },
162 generation: 0,
163 state: IndexState::Ready,
164 }
165 }
166
167 pub(in crate::db) fn visit_entries<E>(
170 &self,
171 mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
172 ) -> Result<(), E> {
173 match &self.backend {
174 IndexStoreBackend::Heap(map) => {
175 for (key, value) in map {
176 #[cfg(feature = "diagnostics")]
177 record_index_store_entry_read();
178
179 if visitor(key, value)?.should_stop() {
180 return Ok(());
181 }
182 }
183 }
184 IndexStoreBackend::Journaled {
185 canonical: _,
186 live: _,
187 tombstones: _,
188 } => self.visit_journaled_entries_in_range(
189 (&Bound::Unbounded, &Bound::Unbounded),
190 Direction::Asc,
191 |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
192 )?,
193 }
194
195 Ok(())
196 }
197
198 pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
199 #[cfg(feature = "diagnostics")]
200 record_index_store_get_call();
201
202 match &self.backend {
203 IndexStoreBackend::Heap(map) => map.get(key).cloned(),
204 IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
205 }
206 }
207
208 pub fn len(&self) -> u64 {
209 match &self.backend {
210 IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
211 IndexStoreBackend::Journaled { .. } => {
212 let mut count = 0_u64;
213 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
214 count = count.saturating_add(1);
215 Ok(IndexStoreVisit::Continue)
216 });
217 count
218 }
219 }
220 }
221
222 pub fn is_empty(&self) -> bool {
223 match &self.backend {
224 IndexStoreBackend::Heap(map) => map.is_empty(),
225 IndexStoreBackend::Journaled { .. } => {
226 let mut empty = true;
227 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
228 empty = false;
229 Ok(IndexStoreVisit::Stop)
230 });
231 empty
232 }
233 }
234 }
235
236 #[must_use]
237 pub(in crate::db) const fn generation(&self) -> u64 {
238 self.generation
239 }
240
241 #[must_use]
243 pub(in crate::db) const fn state(&self) -> IndexState {
244 self.state
245 }
246
247 pub(in crate::db) const fn mark_building(&mut self) {
250 self.state = IndexState::Building;
251 }
252
253 pub(in crate::db) const fn mark_ready(&mut self) {
255 self.state = IndexState::Ready;
256 }
257
258 pub(in crate::db) const fn mark_dropping(&mut self) {
260 self.state = IndexState::Dropping;
261 }
262
263 pub(crate) fn insert(
264 &mut self,
265 key: RawIndexStoreKey,
266 entry: IndexEntryValue,
267 ) -> Option<IndexEntryValue> {
268 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
269 self.get(&key)
270 } else {
271 None
272 };
273 let previous = match &mut self.backend {
274 IndexStoreBackend::Heap(map) => map.insert(key, entry),
275 IndexStoreBackend::Journaled {
276 live, tombstones, ..
277 } => {
278 tombstones.remove(&key);
279 live.insert(key, entry);
280 previous_journaled
281 }
282 };
283 self.bump_generation();
284 previous
285 }
286
287 pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
288 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
289 self.get(key)
290 } else {
291 None
292 };
293 let previous = match &mut self.backend {
294 IndexStoreBackend::Heap(map) => map.remove(key),
295 IndexStoreBackend::Journaled {
296 live, tombstones, ..
297 } => {
298 live.remove(key);
299 tombstones.insert(key.clone());
300 previous_journaled
301 }
302 };
303 self.bump_generation();
304 previous
305 }
306
307 pub fn clear(&mut self) {
308 match &mut self.backend {
309 IndexStoreBackend::Heap(map) => map.clear(),
310 IndexStoreBackend::Journaled {
311 canonical,
312 live,
313 tombstones,
314 } => {
315 live.clear();
316 tombstones.clear();
317 for entry in canonical.iter() {
318 tombstones.insert(entry.key().clone());
319 }
320 }
321 }
322 self.bump_generation();
323 }
324
325 pub(in crate::db) fn fold_journaled_materialized_view(
328 &mut self,
329 ) -> Result<(), crate::error::InternalError> {
330 let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
331 let IndexStoreBackend::Journaled {
332 canonical,
333 live,
334 tombstones,
335 } = &mut self.backend
336 else {
337 return Err(crate::error::InternalError::store_invariant());
338 };
339
340 canonical.clear_new();
341 for (key, value) in entries {
342 canonical.insert(key, value);
343 }
344 live.clear();
345 tombstones.clear();
346 self.bump_generation();
347
348 Ok(())
349 }
350
351 pub fn memory_bytes(&self) -> u64 {
353 let mut bytes = 0u64;
354 let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
355 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
356 Ok(IndexStoreVisit::Continue)
357 });
358 bytes
359 }
360
361 #[cfg(feature = "diagnostics")]
363 pub(in crate::db) fn current_get_call_count() -> u64 {
364 INDEX_STORE_GET_CALL_COUNT.with(Cell::get)
365 }
366
367 #[cfg(feature = "diagnostics")]
369 pub(in crate::db) fn current_entry_read_count() -> u64 {
370 INDEX_STORE_ENTRY_READ_COUNT.with(Cell::get)
371 }
372
373 const fn bump_generation(&mut self) {
374 self.generation = self.generation.saturating_add(1);
375 }
376
377 #[cfg(test)]
378 #[must_use]
379 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
380 match &self.backend {
381 IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
382 IndexStoreBackend::Heap(_) => 0,
383 }
384 }
385
386 fn journaled_get(
387 backend: &IndexStoreBackend,
388 key: &RawIndexStoreKey,
389 ) -> Option<IndexEntryValue> {
390 let IndexStoreBackend::Journaled {
391 canonical,
392 live,
393 tombstones,
394 } = backend
395 else {
396 return None;
397 };
398
399 if tombstones.contains(key) {
400 return None;
401 }
402 live.get(key).cloned().or_else(|| canonical.get(key))
403 }
404
405 pub(super) fn journaled_entries_snapshot_for_fold(
406 backend: &IndexStoreBackend,
407 ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
408 #[cfg(test)]
409 record_journaled_snapshot_call();
410
411 let IndexStoreBackend::Journaled {
412 canonical,
413 live,
414 tombstones,
415 } = backend
416 else {
417 return HeapBTreeMap::new();
418 };
419
420 let mut entries = HeapBTreeMap::new();
421 for entry in canonical.iter() {
422 let key = entry.key().clone();
423 if !tombstones.contains(&key) {
424 entries.insert(key, entry.value());
425 }
426 }
427 for (key, value) in live {
428 if !tombstones.contains(key) {
429 entries.insert(key.clone(), value.clone());
430 }
431 }
432
433 entries
434 }
435
436 pub(super) fn visit_journaled_entries_in_range<E>(
437 &self,
438 bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
439 direction: Direction,
440 mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
441 ) -> Result<(), E> {
442 let IndexStoreBackend::Journaled {
443 canonical,
444 live,
445 tombstones,
446 } = &self.backend
447 else {
448 return Ok(());
449 };
450
451 let lower = bounds.0.clone();
452 let upper = bounds.1.clone();
453 match direction {
454 Direction::Asc if canonical.is_empty() => {
455 for (key, value) in live.range((lower, upper)) {
456 if visit_index_store_entry(key, value, &mut visit)? {
457 return Ok(());
458 }
459 }
460 }
461 Direction::Desc if canonical.is_empty() => {
462 for (key, value) in live.range((lower, upper)).rev() {
463 if visit_index_store_entry(key, value, &mut visit)? {
464 return Ok(());
465 }
466 }
467 }
468 Direction::Asc if live.is_empty() && tombstones.is_empty() => {
469 for entry in canonical.range((lower, upper)) {
470 if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
471 return Ok(());
472 }
473 }
474 }
475 Direction::Desc if live.is_empty() && tombstones.is_empty() => {
476 for entry in canonical.range((lower, upper)).rev() {
477 if visit_index_store_entry(entry.key(), &entry.value(), &mut visit)? {
478 return Ok(());
479 }
480 }
481 }
482 Direction::Asc => {
483 visit_ordered_overlay(
484 canonical.range((lower.clone(), upper.clone())),
485 live.range((lower, upper)),
486 direction,
487 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
488 |canonical_entry| !tombstones.contains(canonical_entry.key()),
489 |live_entry| !tombstones.contains(live_entry.0),
490 |entry| {
491 let should_stop = match entry {
492 OrderedOverlayEntry::Canonical(canonical_entry) => {
493 visit_index_store_entry(
494 canonical_entry.key(),
495 &canonical_entry.value(),
496 &mut visit,
497 )?
498 }
499 OrderedOverlayEntry::Live((key, value)) => {
500 visit_index_store_entry(key, value, &mut visit)?
501 }
502 };
503 Ok(if should_stop {
504 OrderedOverlayVisit::Stop
505 } else {
506 OrderedOverlayVisit::Continue
507 })
508 },
509 )?;
510 }
511 Direction::Desc => {
512 visit_ordered_overlay(
513 canonical.range((lower.clone(), upper.clone())).rev(),
514 live.range((lower, upper)).rev(),
515 direction,
516 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
517 |canonical_entry| !tombstones.contains(canonical_entry.key()),
518 |live_entry| !tombstones.contains(live_entry.0),
519 |entry| {
520 let should_stop = match entry {
521 OrderedOverlayEntry::Canonical(canonical_entry) => {
522 visit_index_store_entry(
523 canonical_entry.key(),
524 &canonical_entry.value(),
525 &mut visit,
526 )?
527 }
528 OrderedOverlayEntry::Live((key, value)) => {
529 visit_index_store_entry(key, value, &mut visit)?
530 }
531 };
532 Ok(if should_stop {
533 OrderedOverlayVisit::Stop
534 } else {
535 OrderedOverlayVisit::Continue
536 })
537 },
538 )?;
539 }
540 }
541
542 Ok(())
543 }
544}
545
546#[cfg(test)]
547mod tests {
548 use super::*;
549 use crate::{db::direction::Direction, testing::test_memory, traits::Storable};
550 use std::{borrow::Cow, convert::Infallible};
551
552 fn raw_key(value: u8) -> RawIndexStoreKey {
553 <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
554 }
555
556 #[cfg(feature = "diagnostics")]
557 #[test]
558 fn index_store_diagnostic_counters_record_gets_and_entry_reads() {
559 let mut store = IndexStore::init_heap();
560 store.insert(raw_key(7), IndexEntryValue::presence());
561 store.insert(raw_key(9), IndexEntryValue::presence());
562
563 let gets_before = IndexStore::current_get_call_count();
564 assert_eq!(store.get(&raw_key(7)), Some(IndexEntryValue::presence()));
565 assert_eq!(store.get(&raw_key(8)), None);
566
567 assert_eq!(
568 IndexStore::current_get_call_count().saturating_sub(gets_before),
569 2,
570 "diagnostic index-store get counter should count both hit and miss reads",
571 );
572
573 let entries_before = IndexStore::current_entry_read_count();
574 store
575 .visit_entries(|_key, _entry| Ok::<_, Infallible>(IndexStoreVisit::Continue))
576 .expect("index entry visit should succeed");
577
578 assert_eq!(
579 IndexStore::current_entry_read_count().saturating_sub(entries_before),
580 2,
581 "diagnostic index-store entry counter should count yielded traversal entries",
582 );
583 }
584
585 #[test]
586 fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
587 let mut store = IndexStore::init_journaled(test_memory(93));
588 for value in [1_u8, 3, 5] {
589 store.insert(raw_key(value), IndexEntryValue::presence());
590 }
591 store
592 .fold_journaled_materialized_view()
593 .expect("canonical index seed should fold");
594
595 store.insert(raw_key(0), IndexEntryValue::presence());
596 store.insert(raw_key(4), IndexEntryValue::presence());
597 store.insert(raw_key(5), IndexEntryValue::presence());
598 store.remove(&raw_key(1));
599
600 let lower = Bound::Included(raw_key(0));
601 let upper = Bound::Included(raw_key(5));
602
603 reset_journaled_snapshot_call_count_for_tests();
604 let mut asc = Vec::new();
605 store
606 .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
607 asc.push(key.as_bytes()[0]);
608 Ok::<_, Infallible>(asc.len() == 2)
609 })
610 .expect("asc journaled index range traversal should succeed");
611 assert_eq!(asc, vec![0, 3]);
612 assert_eq!(
613 journaled_snapshot_call_count_for_tests(),
614 0,
615 "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
616 );
617
618 reset_journaled_snapshot_call_count_for_tests();
619 let mut desc = Vec::new();
620 store
621 .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
622 desc.push(key.as_bytes()[0]);
623 Ok::<_, Infallible>(desc.len() == 2)
624 })
625 .expect("desc journaled index range traversal should succeed");
626 assert_eq!(desc, vec![5, 4]);
627 assert_eq!(
628 journaled_snapshot_call_count_for_tests(),
629 0,
630 "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
631 );
632 }
633}