use crate::db::{
direction::Direction,
index::{IndexEntryValue, key::RawIndexStoreKey},
ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
};
use candid::CandidType;
use ic_memory::stable_structures::{
BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
};
use serde::Deserialize;
#[cfg(test)]
use std::cell::Cell;
use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
use std::ops::Bound;
#[cfg(test)]
thread_local! {
static JOURNALED_SNAPSHOT_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
}
#[cfg(test)]
fn record_journaled_snapshot_call() {
JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| {
count.set(count.get().saturating_add(1));
});
}
#[cfg(test)]
fn reset_journaled_snapshot_call_count_for_tests() {
JOURNALED_SNAPSHOT_CALL_COUNT.with(|count| count.set(0));
}
#[cfg(test)]
fn journaled_snapshot_call_count_for_tests() -> u64 {
JOURNALED_SNAPSHOT_CALL_COUNT.with(Cell::get)
}
#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
pub enum IndexState {
Building,
#[default]
Ready,
Dropping,
}
impl IndexState {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Building => "building",
Self::Ready => "ready",
Self::Dropping => "dropping",
}
}
}
pub struct IndexStore {
pub(super) backend: IndexStoreBackend,
generation: u64,
state: IndexState,
}
pub(super) enum IndexStoreBackend {
Stable(StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>),
Heap(HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>),
Journaled {
canonical:
StableBTreeMap<RawIndexStoreKey, IndexEntryValue, VirtualMemory<DefaultMemoryImpl>>,
live: HeapBTreeMap<RawIndexStoreKey, IndexEntryValue>,
tombstones: BTreeSet<RawIndexStoreKey>,
},
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum IndexStoreVisit {
Continue,
Stop,
}
impl IndexStoreVisit {
const fn should_stop(self) -> bool {
matches!(self, Self::Stop)
}
}
impl IndexStore {
#[must_use]
pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
Self {
backend: IndexStoreBackend::Stable(StableBTreeMap::init(memory)),
generation: 0,
state: IndexState::Ready,
}
}
#[must_use]
pub const fn init_heap() -> Self {
Self {
backend: IndexStoreBackend::Heap(HeapBTreeMap::new()),
generation: 0,
state: IndexState::Ready,
}
}
#[must_use]
pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
Self {
backend: IndexStoreBackend::Journaled {
canonical: StableBTreeMap::init(memory),
live: HeapBTreeMap::new(),
tombstones: BTreeSet::new(),
},
generation: 0,
state: IndexState::Ready,
}
}
pub(in crate::db) fn visit_entries<E>(
&self,
mut visitor: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<IndexStoreVisit, E>,
) -> Result<(), E> {
match &self.backend {
IndexStoreBackend::Stable(map) => {
for entry in map.iter() {
if visitor(entry.key(), &entry.value())?.should_stop() {
return Ok(());
}
}
}
IndexStoreBackend::Heap(map) => {
for (key, value) in map {
if visitor(key, value)?.should_stop() {
return Ok(());
}
}
}
IndexStoreBackend::Journaled {
canonical: _,
live: _,
tombstones: _,
} => self.visit_journaled_entries_in_range(
(&Bound::Unbounded, &Bound::Unbounded),
Direction::Asc,
|key, value| visitor(key, value).map(IndexStoreVisit::should_stop),
)?,
}
Ok(())
}
pub(in crate::db) fn get(&self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
match &self.backend {
IndexStoreBackend::Stable(map) => map.get(key),
IndexStoreBackend::Heap(map) => map.get(key).cloned(),
IndexStoreBackend::Journaled { .. } => Self::journaled_get(&self.backend, key),
}
}
pub fn len(&self) -> u64 {
match &self.backend {
IndexStoreBackend::Stable(map) => map.len(),
IndexStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
IndexStoreBackend::Journaled { .. } => {
let mut count = 0_u64;
let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
count = count.saturating_add(1);
Ok(IndexStoreVisit::Continue)
});
count
}
}
}
pub fn is_empty(&self) -> bool {
match &self.backend {
IndexStoreBackend::Stable(map) => map.is_empty(),
IndexStoreBackend::Heap(map) => map.is_empty(),
IndexStoreBackend::Journaled { .. } => {
let mut empty = true;
let _: Result<(), std::convert::Infallible> = self.visit_entries(|_key, _value| {
empty = false;
Ok(IndexStoreVisit::Stop)
});
empty
}
}
}
#[must_use]
pub(in crate::db) const fn generation(&self) -> u64 {
self.generation
}
#[must_use]
pub(in crate::db) const fn state(&self) -> IndexState {
self.state
}
pub(in crate::db) const fn mark_building(&mut self) {
self.state = IndexState::Building;
}
pub(in crate::db) const fn mark_ready(&mut self) {
self.state = IndexState::Ready;
}
pub(in crate::db) const fn mark_dropping(&mut self) {
self.state = IndexState::Dropping;
}
pub(crate) fn insert(
&mut self,
key: RawIndexStoreKey,
entry: IndexEntryValue,
) -> Option<IndexEntryValue> {
let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
self.get(&key)
} else {
None
};
let previous = match &mut self.backend {
IndexStoreBackend::Stable(map) => map.insert(key, entry),
IndexStoreBackend::Heap(map) => map.insert(key, entry),
IndexStoreBackend::Journaled {
live, tombstones, ..
} => {
tombstones.remove(&key);
live.insert(key, entry);
previous_journaled
}
};
self.bump_generation();
previous
}
pub(crate) fn remove(&mut self, key: &RawIndexStoreKey) -> Option<IndexEntryValue> {
let previous_journaled = if matches!(self.backend, IndexStoreBackend::Journaled { .. }) {
self.get(key)
} else {
None
};
let previous = match &mut self.backend {
IndexStoreBackend::Stable(map) => map.remove(key),
IndexStoreBackend::Heap(map) => map.remove(key),
IndexStoreBackend::Journaled {
live, tombstones, ..
} => {
live.remove(key);
tombstones.insert(key.clone());
previous_journaled
}
};
self.bump_generation();
previous
}
pub fn clear(&mut self) {
match &mut self.backend {
IndexStoreBackend::Stable(map) => map.clear_new(),
IndexStoreBackend::Heap(map) => map.clear(),
IndexStoreBackend::Journaled {
canonical,
live,
tombstones,
} => {
live.clear();
tombstones.clear();
for entry in canonical.iter() {
tombstones.insert(entry.key().clone());
}
}
}
self.bump_generation();
}
pub(in crate::db) fn fold_journaled_materialized_view(
&mut self,
) -> Result<(), crate::error::InternalError> {
let entries = Self::journaled_entries_snapshot_for_fold(&self.backend);
let IndexStoreBackend::Journaled {
canonical,
live,
tombstones,
} = &mut self.backend
else {
return Err(crate::error::InternalError::store_invariant(
"journal index fold requires a journaled index store",
));
};
canonical.clear_new();
for (key, value) in entries {
canonical.insert(key, value);
}
live.clear();
tombstones.clear();
self.bump_generation();
Ok(())
}
pub fn memory_bytes(&self) -> u64 {
let mut bytes = 0u64;
let _: Result<(), std::convert::Infallible> = self.visit_entries(|key, value| {
bytes = bytes.saturating_add(key.as_bytes().len() as u64 + value.len() as u64);
Ok(IndexStoreVisit::Continue)
});
bytes
}
const fn bump_generation(&mut self) {
self.generation = self.generation.saturating_add(1);
}
#[cfg(test)]
#[must_use]
pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
match &self.backend {
IndexStoreBackend::Stable(map)
| IndexStoreBackend::Journaled { canonical: map, .. } => map.len(),
IndexStoreBackend::Heap(_) => 0,
}
}
fn journaled_get(
backend: &IndexStoreBackend,
key: &RawIndexStoreKey,
) -> Option<IndexEntryValue> {
let IndexStoreBackend::Journaled {
canonical,
live,
tombstones,
} = backend
else {
return None;
};
if tombstones.contains(key) {
return None;
}
live.get(key).cloned().or_else(|| canonical.get(key))
}
pub(super) fn journaled_entries_snapshot_for_fold(
backend: &IndexStoreBackend,
) -> HeapBTreeMap<RawIndexStoreKey, IndexEntryValue> {
#[cfg(test)]
record_journaled_snapshot_call();
let IndexStoreBackend::Journaled {
canonical,
live,
tombstones,
} = backend
else {
return HeapBTreeMap::new();
};
let mut entries = HeapBTreeMap::new();
for entry in canonical.iter() {
let key = entry.key().clone();
if !tombstones.contains(&key) {
entries.insert(key, entry.value());
}
}
for (key, value) in live {
if !tombstones.contains(key) {
entries.insert(key.clone(), value.clone());
}
}
entries
}
pub(super) fn visit_journaled_entries_in_range<E>(
&self,
bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
direction: Direction,
mut visit: impl FnMut(&RawIndexStoreKey, &IndexEntryValue) -> Result<bool, E>,
) -> Result<(), E> {
let IndexStoreBackend::Journaled {
canonical,
live,
tombstones,
} = &self.backend
else {
return Ok(());
};
let lower = bounds.0.clone();
let upper = bounds.1.clone();
match direction {
Direction::Asc if canonical.is_empty() => {
for (key, value) in live.range((lower, upper)) {
if visit(key, value)? {
return Ok(());
}
}
}
Direction::Desc if canonical.is_empty() => {
for (key, value) in live.range((lower, upper)).rev() {
if visit(key, value)? {
return Ok(());
}
}
}
Direction::Asc if live.is_empty() && tombstones.is_empty() => {
for entry in canonical.range((lower, upper)) {
if visit(entry.key(), &entry.value())? {
return Ok(());
}
}
}
Direction::Desc if live.is_empty() && tombstones.is_empty() => {
for entry in canonical.range((lower, upper)).rev() {
if visit(entry.key(), &entry.value())? {
return Ok(());
}
}
}
Direction::Asc => {
visit_ordered_overlay(
canonical.range((lower.clone(), upper.clone())),
live.range((lower, upper)),
direction,
|canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
|canonical_entry| !tombstones.contains(canonical_entry.key()),
|live_entry| !tombstones.contains(live_entry.0),
|entry| {
let should_stop = match entry {
OrderedOverlayEntry::Canonical(canonical_entry) => {
visit(canonical_entry.key(), &canonical_entry.value())?
}
OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
};
Ok(if should_stop {
OrderedOverlayVisit::Stop
} else {
OrderedOverlayVisit::Continue
})
},
)?;
}
Direction::Desc => {
visit_ordered_overlay(
canonical.range((lower.clone(), upper.clone())).rev(),
live.range((lower, upper)).rev(),
direction,
|canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
|canonical_entry| !tombstones.contains(canonical_entry.key()),
|live_entry| !tombstones.contains(live_entry.0),
|entry| {
let should_stop = match entry {
OrderedOverlayEntry::Canonical(canonical_entry) => {
visit(canonical_entry.key(), &canonical_entry.value())?
}
OrderedOverlayEntry::Live((key, value)) => visit(key, value)?,
};
Ok(if should_stop {
OrderedOverlayVisit::Stop
} else {
OrderedOverlayVisit::Continue
})
},
)?;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{db::direction::Direction, testing::test_memory, traits::Storable};
use std::{borrow::Cow, convert::Infallible};
fn raw_key(value: u8) -> RawIndexStoreKey {
<RawIndexStoreKey as Storable>::from_bytes(Cow::Owned(vec![value]))
}
#[test]
fn journaled_mixed_index_range_traversal_streams_without_snapshot() {
let mut store = IndexStore::init_journaled(test_memory(93));
for value in [1_u8, 3, 5] {
store.insert(raw_key(value), IndexEntryValue::presence());
}
store
.fold_journaled_materialized_view()
.expect("canonical index seed should fold");
store.insert(raw_key(0), IndexEntryValue::presence());
store.insert(raw_key(4), IndexEntryValue::presence());
store.insert(raw_key(5), IndexEntryValue::presence());
store.remove(&raw_key(1));
let lower = Bound::Included(raw_key(0));
let upper = Bound::Included(raw_key(5));
reset_journaled_snapshot_call_count_for_tests();
let mut asc = Vec::new();
store
.visit_journaled_entries_in_range((&lower, &upper), Direction::Asc, |key, _value| {
asc.push(key.as_bytes()[0]);
Ok::<_, Infallible>(asc.len() == 2)
})
.expect("asc journaled index range traversal should succeed");
assert_eq!(asc, vec![0, 3]);
assert_eq!(
journaled_snapshot_call_count_for_tests(),
0,
"mixed journaled index range traversal should preserve early stop without materializing a snapshot",
);
reset_journaled_snapshot_call_count_for_tests();
let mut desc = Vec::new();
store
.visit_journaled_entries_in_range((&lower, &upper), Direction::Desc, |key, _value| {
desc.push(key.as_bytes()[0]);
Ok::<_, Infallible>(desc.len() == 2)
})
.expect("desc journaled index range traversal should succeed");
assert_eq!(desc, vec![5, 4]);
assert_eq!(
journaled_snapshot_call_count_for_tests(),
0,
"mixed reverse journaled index range traversal should preserve early stop without materializing a snapshot",
);
}
}