1use crate::db::{
7 direction::Direction,
8 index::{IndexEntryValue, key::RawIndexStoreKey},
9 ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
10};
11
12use candid::CandidType;
13use ic_memory::stable_structures::{
14 BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
15};
16use serde::Deserialize;
17#[cfg(test)]
18use std::cell::Cell;
19use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
20use std::ops::Bound;
21
22#[cfg(test)]
23thread_local! {
24 static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
25}
26
27#[cfg(test)]
28fn record_journaled_snapshot_call() {
29 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
30 count.set(count.get().saturating_add(1));
31 });
32}
33
34#[cfg(test)]
35fn reset_journaled_snapshot_call_count_for_tests() {
36 JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
37}
38
39#[cfg(test)]
40fn journaled_snapshot_call_count_for_tests() -> u64 {
41 JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
42}
43
44#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
52pub enum IndexState {
53 Building,
54 #[default]
55 Ready,
56 Dropping,
57}
58
59impl IndexState {
60 #[must_use]
62 pub const fn as_str(self) -> &'static str {
63 match self {
64 Self::Building => "building",
65 Self::Ready => "ready",
66 Self::Dropping => "dropping",
67 }
68 }
69}
70
71pub struct IndexStore {
80 pub(super) backend: IndexStoreBackend,
81 generation: u64,
82 state: IndexState,
83}
84
85pub(super) enum IndexStoreBackend {
86 Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
87 Journaled {
88 canonical:
89 StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
90 live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
91 tombstones: BTreeSet<RawIndexStoreKey>,
92 },
93}
94
95#[derive(Clone, Copy, Debug, Eq, PartialEq)]
97pub(in crate::db) enum IndexStoreVisit {
98 Continue,
99 Stop,
100}
101
102impl IndexStoreVisit {
103 const fn should_stop(self) -> bool {
104 matches!(self, Self::Stop)
105 }
106}
107
108impl IndexStore {
109 #[must_use]
111 pub const fn init_heap() -> Self {
112 Self {
113 backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
114 generation: 0,
115 state: IndexState::Ready,
116 }
117 }
118
119 #[must_use]
124 pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
125 Self {
126 backend: IndexStoreBackend::Journaled {
127 canonical: StableBTreeMap::init(memory),
128 live: HeapBTreeMap::new(),
129 tombstones: BTreeSet::new(),
130 },
131 generation: 0,
132 state: IndexState::Ready,
133 }
134 }
135
136 pub(in crate::db) fn visit_entries<E>(
139 &self,
140 mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
141 ) -> Result<(), E> {
142 match &self.backend {
143 IndexStoreBackend::Heap(map) => {
144 for (key, value) in map {
145 if visitor(key, value)?.should_stop() {
146 return Ok(());
147 }
148 }
149 }
150 IndexStoreBackend::Journaled {
151 canonical: _,
152 live: _,
153 tombstones: _,
154 } => self.visit_journaled_entries_in_range(
155 (&Bound::Unbounded, &Bound::Unbounded),
156 Direction::Asc,
157 |key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
158 )?,
159 }
160
161 Ok(())
162 }
163
164 pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
165 match &self.backend {
166 IndexStoreBackend::Heap(map) => map.get(key).cloned(),
167 IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
168 }
169 }
170
171 pub fn len(&self) -> u64 {
172 match &self.backend {
173 IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
174 IndexStoreBackend::Journaled { .. } => {
175 let mut count = 0_u64;
176 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
177 count = count.saturating_add(1);
178 Ok(IndexStoreVisit::Continue)
179 });
180 count
181 }
182 }
183 }
184
185 pub fn is_empty(&self) -> bool {
186 match &self.backend {
187 IndexStoreBackend::Heap(map) => map.is_empty(),
188 IndexStoreBackend::Journaled { .. } => {
189 let mut empty = true;
190 let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
191 empty = false;
192 Ok(IndexStoreVisit::Stop)
193 });
194 empty
195 }
196 }
197 }
198
199 #[must_use]
200 pub(in crate::db) const fn generation(&self) -> u64 {
201 self.generation
202 }
203
204 #[must_use]
206 pub(in crate::db) const fn state(&self) -> IndexState {
207 self.state
208 }
209
210 pub(in crate::db) const fn mark_building(&mut self) {
213 self.state = IndexState::Building;
214 }
215
216 pub(in crate::db) const fn mark_ready(&mut self) {
218 self.state = IndexState::Ready;
219 }
220
221 pub(in crate::db) const fn mark_dropping(&mut self) {
223 self.state = IndexState::Dropping;
224 }
225
226 pub(crate) fn insert(
227 &mut self,
228 key: RawIndexStoreKey,
229 entry: IndexEntryValue,
230 ) -> Option<IndexEntryValue> {
231 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
232 self.get(&key)
233 } else {
234 None
235 };
236 let previous = match &mut self.backend {
237 IndexStoreBackend::Heap(map) => map.insert(key, entry),
238 IndexStoreBackend::Journaled {
239 live, tombstones, ..
240 } => {
241 tombstones.remove(&key);
242 live.insert(key, entry);
243 previous_journaled
244 }
245 };
246 self.bump_generation();
247 previous
248 }
249
250 pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
251 let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
252 self.get(key)
253 } else {
254 None
255 };
256 let previous = match &mut self.backend {
257 IndexStoreBackend::Heap(map) => map.remove(key),
258 IndexStoreBackend::Journaled {
259 live, tombstones, ..
260 } => {
261 live.remove(key);
262 tombstones.insert(key.clone());
263 previous_journaled
264 }
265 };
266 self.bump_generation();
267 previous
268 }
269
270 pub fn clear(&mut self) {
271 match &mut self.backend {
272 IndexStoreBackend::Heap(map) => map.clear(),
273 IndexStoreBackend::Journaled {
274 canonical,
275 live,
276 tombstones,
277 } => {
278 live.clear();
279 tombstones.clear();
280 for entry in canonical.iter() {
281 tombstones.insert(entry.key().clone());
282 }
283 }
284 }
285 self.bump_generation();
286 }
287
288 pub(in crate::db) fn fold_journaled_materialized_view(
291 &mut self,
292 ) -> Result<(), crate::error::InternalError> {
293 let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
294 let IndexStoreBackend::Journaled {
295 canonical,
296 live,
297 tombstones,
298 } = &mut self.backend
299 else {
300 return Err(crate::error::InternalError::store_invariant());
301 };
302
303 canonical.clear_new();
304 for (key, value) in entries {
305 canonical.insert(key, value);
306 }
307 live.clear();
308 tombstones.clear();
309 self.bump_generation();
310
311 Ok(())
312 }
313
314 pub fn memory_bytes(&self) -> u64 {
316 let mut bytes = 0u64;
317 let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
318 bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
319 Ok(IndexStoreVisit::Continue)
320 });
321 bytes
322 }
323
324 const fn bump_generation(&mut self) {
325 self.generation = self.generation.saturating_add(1);
326 }
327
328 #[cfg(test)]
329 #[must_use]
330 pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
331 match &self.backend {
332 IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
333 IndexStoreBackend::Heap(_) => 0,
334 }
335 }
336
337 fn journaled_get(
338 backend: &IndexStoreBackend,
339 key: &RawIndexStoreKey,
340 ) -> Option<IndexEntryValue> {
341 let IndexStoreBackend::Journaled {
342 canonical,
343 live,
344 tombstones,
345 } = backend
346 else {
347 return None;
348 };
349
350 if tombstones.contains(key) {
351 return None;
352 }
353 live.get(key).cloned().or_else(|| canonical.get(key))
354 }
355
356 pub(super) fn journaled_entries_snapshot_for_fold(
357 backend: &IndexStoreBackend,
358 ) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
359 #[cfg(test)]
360 record_journaled_snapshot_call();
361
362 let IndexStoreBackend::Journaled {
363 canonical,
364 live,
365 tombstones,
366 } = backend
367 else {
368 return HeapBTreeMap::new();
369 };
370
371 let mut entries = HeapBTreeMap::new();
372 for entry in canonical.iter() {
373 let key = entry.key().clone();
374 if !tombstones.contains(&key) {
375 entries.insert(key, entry.value());
376 }
377 }
378 for (key, value) in live {
379 if !tombstones.contains(key) {
380 entries.insert(key.clone(), value.clone());
381 }
382 }
383
384 entries
385 }
386
387 pub(super) fn visit_journaled_entries_in_range<E>(
388 &self,
389 bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
390 direction: Direction,
391 mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
392 ) -> Result<(), E> {
393 let IndexStoreBackend::Journaled {
394 canonical,
395 live,
396 tombstones,
397 } = &self.backend
398 else {
399 return Ok(());
400 };
401
402 let lower = bounds.0.clone();
403 let upper = bounds.1.clone();
404 match direction {
405 Direction::Asc if canonical.is_empty() => {
406 for (key, value) in live.range((lower, upper)) {
407 if visit(key, value)? {
408 return Ok(());
409 }
410 }
411 }
412 Direction::Desc if canonical.is_empty() => {
413 for (key, value) in live.range((lower, upper)).rev() {
414 if visit(key, value)? {
415 return Ok(());
416 }
417 }
418 }
419 Direction::Asc if live.is_empty() && tombstones.is_empty() => {
420 for entry in canonical.range((lower, upper)) {
421 if visit(entry.key(), &entry.value())? {
422 return Ok(());
423 }
424 }
425 }
426 Direction::Desc if live.is_empty() && tombstones.is_empty() => {
427 for entry in canonical.range((lower, upper)).rev() {
428 if visit(entry.key(), &entry.value())? {
429 return Ok(());
430 }
431 }
432 }
433 Direction::Asc => {
434 visit_ordered_overlay(
435 canonical.range((lower.clone(), upper.clone())),
436 live.range((lower, upper)),
437 direction,
438 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
439 |canonical_entry| !tombstones.contains(canonical_entry.key()),
440 |live_entry| !tombstones.contains(live_entry.0),
441 |entry| {
442 let should_stop = match entry {
443 OrderedOverlayEntry::Canonical(canonical_entry) => {
444 visit(canonical_entry.key(), &canonical_entry.value())?
445 }
446 OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
447 };
448 Ok(if should_stop {
449 OrderedOverlayVisit::Stop
450 } else {
451 OrderedOverlayVisit::Continue
452 })
453 },
454 )?;
455 }
456 Direction::Desc => {
457 visit_ordered_overlay(
458 canonical.range((lower.clone(), upper.clone())).rev(),
459 live.range((lower, upper)).rev(),
460 direction,
461 |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
462 |canonical_entry| !tombstones.contains(canonical_entry.key()),
463 |live_entry| !tombstones.contains(live_entry.0),
464 |entry| {
465 let should_stop = match entry {
466 OrderedOverlayEntry::Canonical(canonical_entry) => {
467 visit(canonical_entry.key(), &canonical_entry.value())?
468 }
469 OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
470 };
471 Ok(if should_stop {
472 OrderedOverlayVisit::Stop
473 } else {
474 OrderedOverlayVisit::Continue
475 })
476 },
477 )?;
478 }
479 }
480
481 Ok(())
482 }
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488 use crate::{db::direction::Direction, testing::test_memory, traits::Storable};
489 use std::{borrow::Cow, convert::Infallible};
490
491 fn raw_key(value: u8) -> RawIndexStoreKey {
492 <RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
493 }
494
495 #[test]
496 fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
497 let mut store = IndexStore::init_journaled(test_memory(93));
498 for value in [1_u8, 3, 5] {
499 store.insert(raw_key(value), IndexEntryValue::presence());
500 }
501 store
502 .fold_journaled_materialized_view()
503 .expect("canonical index seed should fold");
504
505 store.insert(raw_key(0), IndexEntryValue::presence());
506 store.insert(raw_key(4), IndexEntryValue::presence());
507 store.insert(raw_key(5), IndexEntryValue::presence());
508 store.remove(&raw_key(1));
509
510 let lower = Bound::Included(raw_key(0));
511 let upper = Bound::Included(raw_key(5));
512
513 reset_journaled_snapshot_call_count_for_tests();
514 let mut asc = Vec::new();
515 store
516 .visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
517 asc.push(key.as_bytes()[0]);
518 Ok::<_, Infallible>(asc.len() == 2)
519 })
520 .expect("asc journaled index range traversal should succeed");
521 assert_eq!(asc, vec![0, 3]);
522 assert_eq!(
523 journaled_snapshot_call_count_for_tests(),
524 0,
525 "mixed journaled index range traversal should preserve early stop without materializing a snapshot",
526 );
527
528 reset_journaled_snapshot_call_count_for_tests();
529 let mut desc = Vec::new();
530 store
531 .visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
532 desc.push(key.as_bytes()[0]);
533 Ok::<_, Infallible>(desc.len() == 2)
534 })
535 .expect("desc journaled index range traversal should succeed");
536 assert_eq!(desc, vec![5, 4]);
537 assert_eq!(
538 journaled_snapshot_call_count_for_tests(),
539 0,
540 "mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
541 );
542 }
543}