use std::mem::size_of;
use std::ops::Bound;
use std::sync::Arc;
use crate::Key;
use crate::byte_view::ByteView;
use crate::cache::{BlockCache, BlockKey};
use crate::compaction::{CompactionIndex, compact_shard};
use crate::config::Config;
use crate::disk_loc::DiskLoc;
use crate::engine::Engine;
use crate::error::{DbError, DbResult};
use crate::hook::{NoHook, WriteHook};
use crate::io::aligned_buf::AlignedBuf;
use crate::recovery::recover_var_tree;
use crate::shard::ShardInner;
use crate::skiplist::node::{SkipNode, VarNode, random_height};
use crate::skiplist::{InsertResult, SkipList};
use crate::sync::MutexGuard;
const MAX_STALE_RETRIES: usize = 3;
pub struct VarTree<K: Key, H: WriteHook<K> = NoHook> {
index: SkipList<VarNode<K>>,
engine: Engine,
cache: BlockCache,
compaction_threshold: f64,
shard_prefix_bits: usize,
reversed: bool,
hook: H,
}
impl<K: Key> VarTree<K> {
pub fn open(path: impl AsRef<std::path::Path>, config: Config) -> DbResult<Self> {
Self::open_inner(path, config, NoHook)
}
}
impl<K: Key, H: WriteHook<K>> VarTree<K, H> {
pub fn open_hooked(
path: impl AsRef<std::path::Path>,
config: Config,
hook: H,
) -> DbResult<Self> {
Self::open_inner(path, config, hook)
}
fn open_inner(path: impl AsRef<std::path::Path>, config: Config, hook: H) -> DbResult<Self> {
let compaction_threshold = config.compaction_threshold;
let shard_prefix_bits = config.shard_prefix_bits;
let reversed = config.reversed;
let cache = BlockCache::new(&config.cache);
let engine = Engine::open(path, config)?;
let tree = Self {
index: SkipList::new(reversed),
engine,
cache,
compaction_threshold,
shard_prefix_bits,
reversed,
hook,
};
let shard_dirs = tree.engine.shard_dirs();
let shard_dir_refs = Engine::shard_dir_refs(&shard_dirs);
let shard_ids = tree.engine.shard_ids();
let hints = tree.engine.hints();
let max_gsn = recover_var_tree::<K>(
&shard_dir_refs,
&shard_ids,
tree.index(),
hints,
#[cfg(feature = "encryption")]
tree.engine.cipher(),
)?;
tree.engine
.gsn()
.fetch_max(max_gsn + 1, std::sync::atomic::Ordering::Relaxed);
if hints {
for shard in tree.engine.shards().iter() {
shard.set_key_len(size_of::<K>());
}
}
tracing::info!(
key_size = size_of::<K>(),
entries = tree.len(),
"var_tree recovered"
);
Ok(tree)
}
pub fn close(self) -> DbResult<()> {
if self.engine.hints() {
self.sync_hints()?;
}
self.engine.flush()
}
pub fn flush_buffers(&self) -> DbResult<()> {
self.engine.flush_buffers()
}
pub fn config(&self) -> &Config {
self.engine.config()
}
}
impl<K: Key, H: WriteHook<K>> CompactionIndex<K> for VarTree<K, H> {
fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
let guard = self.index.collector().enter();
if let Some(node) = self.index.get(key.as_bytes(), &guard) {
let current_ptr = node.load_disk_ptr();
let current_disk = unsafe { *current_ptr };
if current_disk == old_loc {
let new_disk_ptr = Box::into_raw(Box::new(new_loc));
match node.compare_exchange_disk(current_ptr, new_disk_ptr) {
Ok(old_ptr) => {
unsafe {
self.index
.collector()
.retire(old_ptr, seize::reclaim::boxed::<DiskLoc>);
}
return true;
}
Err(_) => {
unsafe {
drop(Box::from_raw(new_disk_ptr));
}
return false;
}
}
}
}
false
}
fn invalidate_blocks(&self, shard_id: u8, file_id: u32, total_bytes: u64) {
self.cache.invalidate_file(shard_id, file_id, total_bytes);
}
fn contains_key(&self, key: &K) -> bool {
self.contains(key)
}
}
impl<K: Key, H: WriteHook<K>> VarTree<K, H> {
pub fn compact(&self) -> DbResult<usize> {
let mut total_compacted = 0;
for shard in self.engine.shards().iter() {
total_compacted += compact_shard(shard, self, self.compaction_threshold)?;
}
Ok(total_compacted)
}
pub fn get(&self, key: &K) -> Option<ByteView> {
metrics::counter!("armdb.ops", "op" => "get", "tree" => "var_tree").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("var_tree.get");
let guard = self.index.collector().enter();
let node = match self.index.get(key.as_bytes(), &guard) {
Some(n) => n,
None => {
#[cfg(feature = "hot-path-tracing")]
tracing::error!(
"VarTree get error: index.get returned None for key {:?}",
key.as_bytes()
);
return None;
}
};
self.read_value_cached(node, &guard)
}
pub fn get_or_err(&self, key: &K) -> DbResult<ByteView> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn put(&self, key: &K, value: &[u8]) -> DbResult<()> {
metrics::counter!("armdb.ops", "op" => "put", "tree" => "var_tree").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("var_tree.put");
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let guard = self.index.collector().enter();
let old_value = if H::NEEDS_OLD_VALUE {
if let Some(node) = self.index.get(key.as_bytes(), &guard) {
let disk = *node.load_disk();
self.read_value_locked(&disk, &inner)
} else {
None
}
} else {
None
};
self.put_locked(shard_id, &mut inner, &guard, key, value)?;
self.hook.on_write(key, old_value.as_deref(), Some(value));
Ok(())
}
pub fn insert(&self, key: &K, value: &[u8]) -> DbResult<()> {
metrics::counter!("armdb.ops", "op" => "insert", "tree" => "var_tree").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("var_tree.insert");
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let guard = self.index.collector().enter();
self.insert_locked(shard_id, &mut inner, &guard, key, value)?;
self.hook.on_write(key, None, Some(value));
Ok(())
}
pub fn delete(&self, key: &K) -> DbResult<bool> {
metrics::counter!("armdb.ops", "op" => "delete", "tree" => "var_tree").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("var_tree.delete");
let shard_id = self.shard_for(key);
let mut inner = self.engine.shards()[shard_id].lock();
let guard = self.index.collector().enter();
let old_value = if H::NEEDS_OLD_VALUE {
if let Some(node) = self.index.get(key.as_bytes(), &guard) {
let disk = *node.load_disk();
self.read_value_locked(&disk, &inner)
} else {
None
}
} else {
None
};
let existed = self.delete_locked(shard_id, &mut inner, &guard, key)?;
if existed {
self.hook.on_write(key, old_value.as_deref(), None);
}
Ok(existed)
}
pub fn atomic<R>(
&self,
shard_key: &K,
f: impl FnOnce(&mut VarShard<'_, K, H>) -> DbResult<R>,
) -> DbResult<R> {
let shard_id = self.shard_for(shard_key);
let inner = self.engine.shards()[shard_id].lock();
let guard = self.index.collector().enter();
let mut shard = VarShard {
tree: self,
inner,
shard_id,
guard,
};
f(&mut shard)
}
fn put_locked(
&self,
shard_id: usize,
inner: &mut ShardInner,
guard: &seize::LocalGuard<'_>,
key: &K,
value: &[u8],
) -> DbResult<()> {
let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), value, false)?;
if let Some(existing) = self.index.get(key.as_bytes(), guard) {
let new_disk = Box::into_raw(Box::new(disk_loc));
let old_disk_ptr = existing.swap_disk(new_disk);
let old_disk = unsafe { *old_disk_ptr };
inner.add_dead_bytes(
old_disk.file_id as u32,
crate::entry::entry_size(size_of::<K>(), old_disk.len),
);
unsafe {
self.index
.collector()
.retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
}
return Ok(());
}
let height = random_height();
let node_ptr = VarNode::alloc(*key, disk_loc, height);
match self.index.insert(node_ptr, guard) {
InsertResult::Inserted => {}
InsertResult::Exists(existing) => {
let new_disk = Box::into_raw(Box::new(disk_loc));
let old_disk_ptr = existing.swap_disk(new_disk);
let old_disk = unsafe { *old_disk_ptr };
inner.add_dead_bytes(
old_disk.file_id as u32,
crate::entry::entry_size(size_of::<K>(), old_disk.len),
);
unsafe {
self.index
.collector()
.retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
}
unsafe {
(*node_ptr)
.disk
.store(std::ptr::null_mut(), std::sync::atomic::Ordering::Relaxed);
VarNode::<K>::dealloc_node(node_ptr);
}
}
}
Ok(())
}
fn insert_locked(
&self,
shard_id: usize,
inner: &mut ShardInner,
guard: &seize::LocalGuard<'_>,
key: &K,
value: &[u8],
) -> DbResult<()> {
if self.index.get(key.as_bytes(), guard).is_some() {
return Err(DbError::KeyExists);
}
let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), value, false)?;
let height = random_height();
let node_ptr = VarNode::alloc(*key, disk_loc, height);
match self.index.insert(node_ptr, guard) {
InsertResult::Inserted => Ok(()),
InsertResult::Exists(_existing) => {
inner.add_dead_bytes(
disk_loc.file_id as u32,
crate::entry::entry_size(size_of::<K>(), disk_loc.len),
);
unsafe { VarNode::<K>::dealloc_node(node_ptr) };
Err(DbError::KeyExists)
}
}
}
fn delete_locked(
&self,
shard_id: usize,
inner: &mut ShardInner,
guard: &seize::LocalGuard<'_>,
key: &K,
) -> DbResult<bool> {
if self.index.get(key.as_bytes(), guard).is_none() {
return Ok(false);
}
inner.append_entry(shard_id as u8, key.as_bytes(), &[], true)?;
let removed = self.index.remove(key.as_bytes(), guard);
if let Some(node_ptr) = removed {
let disk = *unsafe { &*node_ptr }.load_disk();
inner.add_dead_bytes(
disk.file_id as u32,
crate::entry::entry_size(size_of::<K>(), disk.len),
);
}
Ok(removed.is_some())
}
pub fn contains(&self, key: &K) -> bool {
let guard = self.index.collector().enter();
self.index.get(key.as_bytes(), &guard).is_some()
}
pub fn first(&self) -> Option<(K, ByteView)> {
let guard = self.index.collector().enter();
let mut ptr = crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
});
while !ptr.is_null() {
let node = unsafe { &*ptr };
if !node.is_marked() {
return self.read_value_cached(node, &guard).map(|v| (node.key, v));
}
ptr = crate::skiplist::strip_mark(
node.tower(0).load(std::sync::atomic::Ordering::Acquire),
);
}
None
}
pub fn last(&self) -> Option<(K, ByteView)> {
self.iter().next_back()
}
fn resolve_front_asc(
&self,
bound: &Bound<&K>,
guard: &seize::LocalGuard<'_>,
) -> *mut VarNode<K> {
match bound {
Bound::Included(k) => self.index.find_first_ge(k.as_bytes(), guard),
Bound::Excluded(k) => {
let ge = self.index.find_first_ge(k.as_bytes(), guard);
if !ge.is_null()
&& !unsafe { &*ge }.is_marked()
&& unsafe { &*ge }.key_bytes() == k.as_bytes()
{
crate::skiplist::strip_mark(unsafe {
(*ge).tower(0).load(std::sync::atomic::Ordering::Acquire)
})
} else {
ge
}
}
Bound::Unbounded => crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
}),
}
}
fn resolve_front_rev(
&self,
bound: &Bound<&K>,
guard: &seize::LocalGuard<'_>,
) -> *mut VarNode<K> {
match bound {
Bound::Included(k) => self.index.find_first_ge(k.as_bytes(), guard),
Bound::Excluded(k) => {
let ge = self.index.find_first_ge(k.as_bytes(), guard);
if !ge.is_null()
&& !unsafe { &*ge }.is_marked()
&& unsafe { &*ge }.key_bytes() == k.as_bytes()
{
crate::skiplist::strip_mark(unsafe {
(*ge).tower(0).load(std::sync::atomic::Ordering::Acquire)
})
} else {
ge
}
}
Bound::Unbounded => crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
}),
}
}
fn prefix_bounds(&self, prefix: &[u8]) -> (K, Bound<K>) {
if self.reversed {
let mut search = K::zeroed();
search.as_bytes_mut().fill(0xFF);
search.as_bytes_mut()[..prefix.len()].copy_from_slice(prefix);
let mut end_key = K::zeroed();
end_key.as_bytes_mut()[..prefix.len()].copy_from_slice(prefix);
(search, Bound::Included(end_key))
} else {
let mut search = K::zeroed();
search.as_bytes_mut()[..prefix.len()].copy_from_slice(prefix);
let end = prefix_to_end_bound::<K>(prefix);
(search, end)
}
}
pub fn prefix_iter(&self, prefix: &[u8]) -> VarIter<'_, K, H> {
let guard = self.index.collector().enter();
let (search_key, end) = self.prefix_bounds(prefix);
let front = self.index.find_first_ge(search_key.as_bytes(), &guard);
VarIter {
tree: self,
front,
back: None,
end,
start: Bound::Included(search_key),
reversed: self.reversed,
done: false,
_guard: guard,
}
}
pub fn iter(&self) -> VarIter<'_, K, H> {
let guard = self.index.collector().enter();
let front = crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
});
VarIter {
tree: self,
front,
back: None,
end: Bound::Unbounded,
start: Bound::Unbounded,
reversed: self.reversed,
done: false,
_guard: guard,
}
}
pub fn range(&self, start: &K, end: &K) -> VarIter<'_, K, H> {
self.range_bounds(Bound::Included(start), Bound::Excluded(end))
}
pub fn range_bounds(&self, start: Bound<&K>, end: Bound<&K>) -> VarIter<'_, K, H> {
let guard = self.index.collector().enter();
if self.reversed {
let front = self.resolve_front_rev(&end, &guard);
VarIter {
tree: self,
front,
back: None,
end: bound_owned(&start),
start: bound_owned(&end),
reversed: true,
done: false,
_guard: guard,
}
} else {
let front = self.resolve_front_asc(&start, &guard);
VarIter {
tree: self,
front,
back: None,
end: bound_owned(&end),
start: bound_owned(&start),
reversed: false,
done: false,
_guard: guard,
}
}
}
pub fn len(&self) -> usize {
self.index.len()
}
pub fn is_empty(&self) -> bool {
self.index.is_empty()
}
pub fn sync_hints(&self) -> DbResult<()> {
for shard in self.engine.shards().iter() {
shard.write_active_hint(size_of::<K>())?;
}
Ok(())
}
pub fn warmup(&self) -> DbResult<()> {
use std::collections::BTreeSet;
let guard = self.index.collector().enter();
let mut blocks: BTreeSet<(u8, u32, u64)> = BTreeSet::new();
let mut current = crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
});
while !current.is_null() {
let node = unsafe { &*current };
current = crate::skiplist::strip_mark(
node.tower(0).load(std::sync::atomic::Ordering::Acquire),
);
if node.is_marked() {
continue;
}
let disk = *node.load_disk();
let block_offset = disk.offset as u64 & !4095;
blocks.insert((disk.shard_id, disk.file_id as u32, block_offset));
}
drop(guard);
for (shard_id, file_id, block_offset) in &blocks {
let key = BlockKey {
shard_id: *shard_id,
file_id: *file_id,
block_offset: *block_offset,
};
if self.cache.get(&key).is_some() {
continue;
}
let shard = &self.engine.shards()[*shard_id as usize];
let (buf, _) = shard.read_block(*file_id, *block_offset)?;
self.cache.insert(key, Arc::new(buf));
}
Ok(())
}
pub(crate) fn index(&self) -> &SkipList<VarNode<K>> {
&self.index
}
pub fn migrate(
&self,
f: impl Fn(&K, &[u8]) -> crate::MigrateAction<ByteView>,
) -> DbResult<usize> {
use crate::MigrateAction;
let guard = self.index.collector().enter();
let mut current = crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
});
let mut count = 0;
while !current.is_null() {
let node = unsafe { &*current };
current = crate::skiplist::strip_mark(
node.tower(0).load(std::sync::atomic::Ordering::Acquire),
);
if node.is_marked() {
continue;
}
let value = match self.read_value_cached(node, &guard) {
Some(v) => v,
None => continue,
};
match f(&node.key, &value) {
MigrateAction::Keep => {
if H::NEEDS_INIT {
self.hook.on_init(&node.key, &value);
}
}
MigrateAction::Update(new_value) => {
if H::NEEDS_INIT {
self.hook.on_init(&node.key, &new_value);
}
let shard_id = self.shard_for(&node.key);
let mut inner = self.engine.shards()[shard_id].lock();
self.put_locked(shard_id, &mut inner, &guard, &node.key, &new_value)?;
count += 1;
}
MigrateAction::Delete => {
let shard_id = self.shard_for(&node.key);
let mut inner = self.engine.shards()[shard_id].lock();
self.delete_locked(shard_id, &mut inner, &guard, &node.key)?;
count += 1;
}
}
}
tracing::info!(mutations = count, "var_tree migration complete");
Ok(count)
}
pub(crate) fn replay_init(&self) {
if !H::NEEDS_INIT {
return;
}
let guard = self.index.collector().enter();
let mut current = crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
});
let mut count = 0usize;
while !current.is_null() {
let node = unsafe { &*current };
current = crate::skiplist::strip_mark(
node.tower(0).load(std::sync::atomic::Ordering::Acquire),
);
if node.is_marked() {
continue;
}
let value = match self.read_value_cached(node, &guard) {
Some(v) => v,
None => continue,
};
self.hook.on_init(&node.key, &value);
count += 1;
}
tracing::debug!(replayed = count, "var_tree replay_init complete");
}
fn read_value_cached_inner(&self, disk: &DiskLoc) -> DbResult<ByteView> {
let block_offset = disk.offset as u64 & !4095;
let start = (disk.offset & 4095) as usize;
let len = disk.len as usize;
let cache_key = BlockKey {
shard_id: disk.shard_id,
file_id: disk.file_id as u32,
block_offset,
};
if let Some(block) = self.cache.get(&cache_key) {
metrics::counter!("armdb.cache.hit").increment(1);
return Self::extract_from_block(&block, start, len, || {
self.get_or_read_block(disk.shard_id, disk.file_id as u32, block_offset + 4096)
});
}
{
let shard = &self.engine.shards()[disk.shard_id as usize];
let inner = shard.lock();
if inner.active.file_id == disk.file_id as u32
&& let Some(bytes) = inner.write_buf.read(disk.offset as u64, len)
{
return Ok(ByteView::new(bytes));
}
}
metrics::counter!("armdb.cache.miss").increment(1);
let block = self.get_or_read_block(disk.shard_id, disk.file_id as u32, block_offset)?;
Self::extract_from_block(&block, start, len, || {
self.get_or_read_block(disk.shard_id, disk.file_id as u32, block_offset + 4096)
})
}
fn read_value_cached(
&self,
node: &VarNode<K>,
_guard: &seize::LocalGuard<'_>,
) -> Option<ByteView> {
for _ in 0..MAX_STALE_RETRIES {
let disk = *node.load_disk();
match self.read_value_cached_inner(&disk) {
Ok(v) => return Some(v),
Err(DbError::StaleDiskLoc) => {
metrics::counter!("armdb.read.stale_retry", "tree" => "var_tree").increment(1);
continue;
}
Err(_e) => {
#[cfg(feature = "hot-path-tracing")]
tracing::error!("VarTree read_value_cached error: {:?}", _e);
return None;
}
}
}
None
}
fn extract_from_block(
block: &AlignedBuf,
start: usize,
len: usize,
next_block: impl FnOnce() -> DbResult<Arc<AlignedBuf>>,
) -> DbResult<ByteView> {
if start + len <= 4096 {
Ok(ByteView::new(&block[start..start + len]))
} else {
let next = next_block()?;
let first_part = &block[start..];
let second_len = len - first_part.len();
let mut combined = Vec::with_capacity(len);
combined.extend_from_slice(first_part);
combined.extend_from_slice(&next[..second_len]);
Ok(ByteView::from_vec(combined))
}
}
fn get_or_read_block(
&self,
shard_id: u8,
file_id: u32,
block_offset: u64,
) -> DbResult<Arc<AlignedBuf>> {
let key = BlockKey {
shard_id,
file_id,
block_offset,
};
if let Some(cached) = self.cache.get(&key) {
return Ok(cached);
}
let shard = &self.engine.shards()[shard_id as usize];
let (buf, is_full_block) = shard.read_block(file_id, block_offset)?;
let arc = Arc::new(buf);
if is_full_block {
self.cache.insert(key, arc.clone());
}
Ok(arc)
}
pub fn cas(&self, key: &K, expected: &[u8], new_value: &[u8]) -> DbResult<()> {
metrics::counter!("armdb.ops", "op" => "cas", "tree" => "var_tree").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("var_tree.cas");
let shard_id = self.shard_for(key);
let shard = &self.engine.shards()[shard_id];
let mut inner = shard.lock();
let guard = self.index.collector().enter();
let existing = self
.index
.get(key.as_bytes(), &guard)
.ok_or(DbError::KeyNotFound)?;
let disk = *existing.load_disk();
let current = self
.read_value_locked(&disk, &inner)
.ok_or(DbError::KeyNotFound)?;
if current.as_ref() != expected {
return Err(DbError::CasMismatch);
}
let (new_disk_loc, _gsn) =
inner.append_entry(shard_id as u8, key.as_bytes(), new_value, false)?;
let new_disk = Box::into_raw(Box::new(new_disk_loc));
let old_disk_ptr = existing.swap_disk(new_disk);
let old_disk = unsafe { *old_disk_ptr };
inner.add_dead_bytes(
old_disk.file_id as u32,
crate::entry::entry_size(size_of::<K>(), old_disk.len),
);
unsafe {
self.index
.collector()
.retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
}
self.hook.on_write(key, Some(&*current), Some(new_value));
Ok(())
}
pub fn update(&self, key: &K, f: impl FnOnce(&[u8]) -> ByteView) -> DbResult<Option<ByteView>> {
self.update_inner(key, f, false)
}
pub fn fetch_update(
&self,
key: &K,
f: impl FnOnce(&[u8]) -> ByteView,
) -> DbResult<Option<ByteView>> {
self.update_inner(key, f, true)
}
fn update_inner(
&self,
key: &K,
f: impl FnOnce(&[u8]) -> ByteView,
return_old: bool,
) -> DbResult<Option<ByteView>> {
metrics::counter!("armdb.ops", "op" => "update", "tree" => "var_tree").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("var_tree.update");
let shard_id = self.shard_for(key);
let shard = &self.engine.shards()[shard_id];
let mut inner = shard.lock();
let guard = self.index.collector().enter();
let existing = match self.index.get(key.as_bytes(), &guard) {
Some(n) => n,
None => return Ok(None),
};
let disk = *existing.load_disk();
let current = match self.read_value_locked(&disk, &inner) {
Some(v) => v,
None => return Ok(None),
};
let new_value = f(¤t);
let (new_disk_loc, _gsn) =
inner.append_entry(shard_id as u8, key.as_bytes(), &new_value, false)?;
let new_disk = Box::into_raw(Box::new(new_disk_loc));
let old_disk_ptr = existing.swap_disk(new_disk);
let old_disk = unsafe { *old_disk_ptr };
inner.add_dead_bytes(
old_disk.file_id as u32,
crate::entry::entry_size(size_of::<K>(), old_disk.len),
);
unsafe {
self.index
.collector()
.retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
}
self.hook.on_write(key, Some(&*current), Some(&*new_value));
Ok(Some(if return_old { current } else { new_value }))
}
fn read_value_locked(&self, disk: &DiskLoc, inner: &ShardInner) -> Option<ByteView> {
let len = disk.len as usize;
if inner.active.file_id == disk.file_id as u32
&& let Some(bytes) = inner.write_buf.read(disk.offset as u64, len)
{
return Some(ByteView::new(bytes));
}
let block_offset = disk.offset as u64 & !4095;
let start = (disk.offset & 4095) as usize;
if start + len <= 4096 {
let cache_key = BlockKey {
shard_id: disk.shard_id,
file_id: disk.file_id as u32,
block_offset,
};
if let Some(block) = self.cache.get(&cache_key) {
return Some(ByteView::new(&block[start..start + len]));
}
}
let bytes = match inner.read_value_from_disk_locked(disk) {
Ok(b) => b,
Err(DbError::StaleDiskLoc) => {
tracing::error!(
file_id = disk.file_id,
shard_id = disk.shard_id,
"stale DiskLoc under shard lock - programming bug",
);
return None;
}
Err(_) => return None,
};
Some(ByteView::new(&bytes))
}
pub fn shard_for(&self, key: &K) -> usize {
if self.shard_prefix_bits == 0 || self.shard_prefix_bits >= size_of::<K>() * 8 {
let hash = xxhash_rust::xxh3::xxh3_64(key.as_bytes());
return (hash as usize) % self.engine.shards().len();
}
let full_bytes = self.shard_prefix_bits / 8;
let extra_bits = self.shard_prefix_bits % 8;
let hash = if extra_bits == 0 {
xxhash_rust::xxh3::xxh3_64(&key.as_bytes()[..full_bytes])
} else {
let mut buf = K::zeroed();
buf.as_bytes_mut()[..full_bytes + 1].copy_from_slice(&key.as_bytes()[..full_bytes + 1]);
let mask = !((1u8 << (8 - extra_bits)) - 1);
buf.as_bytes_mut()[full_bytes] = key.as_bytes()[full_bytes] & mask;
xxhash_rust::xxh3::xxh3_64(&buf.as_bytes()[..full_bytes + 1])
};
(hash as usize) % self.engine.shards().len()
}
}
#[cfg(feature = "replication")]
impl<K: Key, H: WriteHook<K>> crate::replication::ReplicationTarget for VarTree<K, H> {
fn apply_entry(
&self,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
key: &[u8],
_value: &[u8],
) -> DbResult<()> {
let key: K = K::from_bytes(key);
let value_offset =
entry_offset + size_of::<crate::entry::EntryHeader>() as u64 + size_of::<K>() as u64;
let disk = DiskLoc::new(
shard_id,
file_id as u16,
value_offset as u32,
header.value_len,
);
if header.is_tombstone() {
let guard = self.index.collector().enter();
self.index.remove(key.as_bytes(), &guard);
} else {
let guard = self.index.collector().enter();
let height = random_height();
let node_ptr = VarNode::alloc(key, disk, height);
match self.index.insert(node_ptr, &guard) {
InsertResult::Inserted => {}
InsertResult::Exists(existing) => {
let new_disk_ptr = Box::into_raw(Box::new(disk));
let old_disk_ptr = existing.swap_disk(new_disk_ptr);
unsafe {
self.index
.collector()
.retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
}
unsafe {
(*node_ptr)
.disk
.store(std::ptr::null_mut(), std::sync::atomic::Ordering::Relaxed);
VarNode::<K>::dealloc_node(node_ptr);
}
}
}
}
Ok(())
}
fn try_apply_entry(
&self,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
raw_after_header: &[u8],
) -> DbResult<bool> {
if raw_after_header.len() < size_of::<K>() + header.value_len as usize {
return Ok(false);
}
let key = &raw_after_header[..size_of::<K>()];
let value = &raw_after_header[size_of::<K>()..size_of::<K>() + header.value_len as usize];
let crc = crate::entry::compute_crc32(header.gsn, header.value_len, key, value);
if crc != header.crc32 {
return Ok(false);
}
self.apply_entry(shard_id, file_id, entry_offset, header, key, value)?;
Ok(true)
}
fn key_len(&self) -> usize {
size_of::<K>()
}
}
pub struct VarShard<'a, K: Key, H: WriteHook<K> = NoHook> {
tree: &'a VarTree<K, H>,
inner: MutexGuard<'a, ShardInner>,
shard_id: usize,
guard: seize::LocalGuard<'a>,
}
impl<K: Key, H: WriteHook<K>> VarShard<'_, K, H> {
pub fn put(&mut self, key: &K, value: &[u8]) -> DbResult<()> {
self.check_shard(key)?;
self.tree
.put_locked(self.shard_id, &mut self.inner, &self.guard, key, value)
}
pub fn insert(&mut self, key: &K, value: &[u8]) -> DbResult<()> {
self.check_shard(key)?;
self.tree
.insert_locked(self.shard_id, &mut self.inner, &self.guard, key, value)
}
pub fn delete(&mut self, key: &K) -> DbResult<bool> {
self.check_shard(key)?;
self.tree
.delete_locked(self.shard_id, &mut self.inner, &self.guard, key)
}
pub fn get(&self, key: &K) -> Option<ByteView> {
let node = self.tree.index.get(key.as_bytes(), &self.guard)?;
let disk = *node.load_disk();
self.tree.read_value_locked(&disk, &self.inner)
}
pub fn get_or_err(&self, key: &K) -> DbResult<ByteView> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.tree.index.get(key.as_bytes(), &self.guard).is_some()
}
fn check_shard(&self, key: &K) -> DbResult<()> {
if self.tree.shard_for(key) != self.shard_id {
return Err(DbError::ShardMismatch);
}
Ok(())
}
}
fn bound_owned<K: Copy>(b: &Bound<&K>) -> Bound<K> {
match b {
Bound::Included(k) => Bound::Included(**k),
Bound::Excluded(k) => Bound::Excluded(**k),
Bound::Unbounded => Bound::Unbounded,
}
}
fn prefix_to_end_bound<K: Key>(prefix: &[u8]) -> Bound<K> {
let mut incremented = prefix.to_vec();
let mut carry = true;
for byte in incremented.iter_mut().rev() {
if carry {
if *byte == 0xFF {
*byte = 0x00;
} else {
*byte += 1;
carry = false;
break;
}
}
}
if carry {
Bound::Unbounded
} else {
let mut end = K::zeroed();
end.as_bytes_mut()[..incremented.len()].copy_from_slice(&incremented);
Bound::Excluded(end)
}
}
pub struct VarIter<'a, K: Key, H: WriteHook<K> = NoHook> {
tree: &'a VarTree<K, H>,
front: *mut VarNode<K>,
back: Option<*mut VarNode<K>>,
end: Bound<K>,
start: Bound<K>,
reversed: bool,
done: bool,
_guard: seize::LocalGuard<'a>,
}
impl<K: Key, H: WriteHook<K>> Iterator for VarIter<'_, K, H> {
type Item = (K, ByteView);
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.done || self.front.is_null() {
return None;
}
let node = unsafe { &*self.front };
let converged = self.back.is_some_and(|back| std::ptr::eq(self.front, back));
self.front = crate::skiplist::strip_mark(
node.tower(0).load(std::sync::atomic::Ordering::Acquire),
);
if converged {
self.done = true;
}
if node.is_marked() {
if converged {
return None;
}
continue;
}
if !self.check_end(&node.key) {
self.done = true;
return None;
}
match self.tree.read_value_cached(node, &self._guard) {
Some(value) => return Some((node.key, value)),
None => {
if converged {
return None;
}
continue;
}
}
}
}
}
impl<K: Key, H: WriteHook<K>> DoubleEndedIterator for VarIter<'_, K, H> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.back.is_none() {
self.back = Some(self.resolve_back());
if self.front.is_null() {
self.done = true;
}
}
loop {
let back = self.back.unwrap_or(std::ptr::null_mut());
if self.done || back.is_null() {
return None;
}
let node = unsafe { &*back };
let key = node.key;
let converged = std::ptr::eq(self.front, back);
self.back = Some(self.tree.index().find_last_lt(key.as_bytes()));
if converged {
self.done = true;
}
if node.is_marked() {
if converged {
return None;
}
continue;
}
if !self.check_start(&key) {
self.done = true;
return None;
}
match self.tree.read_value_cached(node, &self._guard) {
Some(value) => return Some((key, value)),
None => {
if converged {
return None;
}
continue;
}
}
}
}
}
impl<K: Key, H: WriteHook<K>> VarIter<'_, K, H> {
fn resolve_back(&self) -> *mut VarNode<K> {
let index = self.tree.index();
match &self.end {
Bound::Unbounded => index.find_last(),
Bound::Excluded(k) => index.find_last_lt(k.as_bytes()),
Bound::Included(k) => {
let ge = index.find_first_ge(k.as_bytes(), &self._guard);
if !ge.is_null()
&& !unsafe { &*ge }.is_marked()
&& unsafe { &*ge }.key_bytes() == k.as_bytes()
{
ge
} else {
index.find_last_lt(k.as_bytes())
}
}
}
}
#[inline(always)]
fn check_end(&self, key: &K) -> bool {
match &self.end {
Bound::Unbounded => true,
Bound::Excluded(end) => {
if self.reversed {
key.as_bytes() > end.as_bytes()
} else {
key.as_bytes() < end.as_bytes()
}
}
Bound::Included(end) => {
if self.reversed {
key.as_bytes() >= end.as_bytes()
} else {
key.as_bytes() <= end.as_bytes()
}
}
}
}
#[inline(always)]
fn check_start(&self, key: &K) -> bool {
match &self.start {
Bound::Unbounded => true,
Bound::Excluded(s) => {
if self.reversed {
key.as_bytes() < s.as_bytes()
} else {
key.as_bytes() > s.as_bytes()
}
}
Bound::Included(s) => {
if self.reversed {
key.as_bytes() <= s.as_bytes()
} else {
key.as_bytes() >= s.as_bytes()
}
}
}
}
pub fn collect_keys(&mut self) -> Vec<K> {
self.map(|(k, _)| k).collect()
}
pub fn collect_entries(&mut self) -> Vec<(K, ByteView)> {
self.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Config;
use crate::compaction::compact_shard;
use tempfile::tempdir;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
#[derive(Default)]
struct CountingHook<const NEEDS_INIT: bool, const NEEDS_OLD: bool> {
writes: AtomicUsize,
writes_with_old: AtomicUsize,
inits: AtomicUsize,
last_write_new: crate::sync::Mutex<Option<Vec<u8>>>,
last_init_value: crate::sync::Mutex<Option<Vec<u8>>>,
}
impl<const NEEDS_INIT: bool, const NEEDS_OLD: bool> WriteHook<[u8; 8]>
for CountingHook<NEEDS_INIT, NEEDS_OLD>
{
const NEEDS_OLD_VALUE: bool = NEEDS_OLD;
const NEEDS_INIT: bool = NEEDS_INIT;
fn on_write(&self, _key: &[u8; 8], old: Option<&[u8]>, new: Option<&[u8]>) {
self.writes.fetch_add(1, AtomicOrdering::Relaxed);
if old.is_some() {
self.writes_with_old.fetch_add(1, AtomicOrdering::Relaxed);
}
*crate::sync::lock(&self.last_write_new) = new.map(<[u8]>::to_vec);
}
fn on_init(&self, _key: &[u8; 8], value: &[u8]) {
self.inits.fetch_add(1, AtomicOrdering::Relaxed);
*crate::sync::lock(&self.last_init_value) = Some(value.to_vec());
}
}
fn open_test_tree(dir: &std::path::Path) -> VarTree<[u8; 8]> {
let mut cfg = Config::test();
cfg.shard_count = 1;
cfg.max_file_size = 4096;
cfg.write_buffer_size = 8192;
cfg.compaction_threshold = 0.0;
VarTree::open(dir, cfg).expect("open test tree")
}
fn open_test_tree_hooked<const NEEDS_INIT: bool, const NEEDS_OLD: bool>(
dir: &std::path::Path,
hook: CountingHook<NEEDS_INIT, NEEDS_OLD>,
) -> VarTree<[u8; 8], CountingHook<NEEDS_INIT, NEEDS_OLD>> {
let mut cfg = Config::test();
cfg.shard_count = 1;
cfg.max_file_size = 4096;
cfg.write_buffer_size = 8192;
cfg.compaction_threshold = 0.0;
VarTree::open_hooked(dir, cfg, hook).expect("open hooked test tree")
}
fn put_until_compactable(tree: &VarTree<[u8; 8]>, key: [u8; 8]) -> DiskLoc {
tree.put(&key, &[0u8; 256]).expect("first put");
let snap = {
let guard = tree.index.collector().enter();
let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
*node.load_disk()
};
for i in 1..20u8 {
tree.put(&key, &[i; 256]).expect("put");
}
tree.put(&key, b"final-value-payload-XX")
.expect("final put");
snap
}
#[test]
fn read_value_cached_inner_returns_stale_after_compaction() {
let dir = tempdir().unwrap();
let tree = open_test_tree(dir.path());
let key = 7u64.to_be_bytes();
let snap = put_until_compactable(&tree, key);
let shard = &tree.engine.shards()[snap.shard_id as usize];
let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
match tree.read_value_cached_inner(&snap) {
Err(DbError::StaleDiskLoc) => {}
Ok(v) => panic!("expected StaleDiskLoc, got Ok({:?})", v.as_bytes()),
Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
}
}
#[test]
fn read_value_cached_returns_some_after_compaction() {
let dir = tempdir().unwrap();
let tree = open_test_tree(dir.path());
let key = 11u64.to_be_bytes();
let _snap = put_until_compactable(&tree, key);
let shard = &tree.engine.shards()[0];
let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
let guard = tree.index.collector().enter();
let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
let v = tree
.read_value_cached(node, &guard)
.expect("post-compaction value must be readable");
assert_eq!(v.as_bytes(), b"final-value-payload-XX");
}
#[test]
fn get_during_compaction_returns_some() {
let dir = tempdir().unwrap();
let tree = open_test_tree(dir.path());
let key = 13u64.to_be_bytes();
let _snap = put_until_compactable(&tree, key);
let shard = &tree.engine.shards()[0];
let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
let v = tree.get(&key).expect("post-compaction get");
assert_eq!(v.as_bytes(), b"final-value-payload-XX");
}
#[test]
fn iter_during_compaction_yields_all_live_keys() {
let dir = tempdir().unwrap();
let tree = open_test_tree(dir.path());
for k in 1u64..=3 {
put_until_compactable(&tree, k.to_be_bytes());
}
let shard = &tree.engine.shards()[0];
let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
let collected: std::collections::BTreeMap<[u8; 8], Vec<u8>> = tree
.iter()
.map(|(k, v)| (k, v.as_bytes().to_vec()))
.collect();
assert_eq!(collected.len(), 3);
for k in 1u64..=3 {
let bytes = collected
.get(&k.to_be_bytes())
.expect("every original key must remain");
assert_eq!(bytes.as_slice(), b"final-value-payload-XX");
}
}
#[test]
fn get_or_read_block_returns_stale_for_unknown_file_id() {
let dir = tempdir().unwrap();
let tree = open_test_tree(dir.path());
match tree.get_or_read_block(0, 9999, 0) {
Err(DbError::StaleDiskLoc) => {}
Ok(_) => panic!("expected StaleDiskLoc, got Ok"),
Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
}
}
#[test]
fn extract_from_block_propagates_next_block_error() {
let block = AlignedBuf::zeroed(4096);
let start = 4090;
let len = 32;
let result: DbResult<ByteView> =
VarTree::<[u8; 8]>::extract_from_block(&block, start, len, || {
Err(DbError::StaleDiskLoc)
});
match result {
Err(DbError::StaleDiskLoc) => {}
Ok(_) => panic!("expected StaleDiskLoc, got Ok"),
Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
}
}
#[test]
fn extract_from_block_multi_block_first_cached_second_stale() {
let dir = tempdir().unwrap();
let tree = open_test_tree(dir.path());
let key = 21u64.to_be_bytes();
let value = vec![0xCDu8; 4073];
tree.put(&key, &value).expect("initial put");
let snap = {
let guard = tree.index.collector().enter();
let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
*node.load_disk()
};
for i in 0..20u8 {
tree.put(&key, &[i; 256]).expect("overwrite");
}
tree.put(&key, b"final").expect("final");
let shard = &tree.engine.shards()[0];
let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
let block_offset = snap.offset as u64 & !4095;
let first_block = AlignedBuf::zeroed(4096);
let cache_key = BlockKey {
shard_id: snap.shard_id,
file_id: snap.file_id as u32,
block_offset,
};
tree.cache.insert(cache_key, Arc::new(first_block));
match tree.read_value_cached_inner(&snap) {
Err(DbError::StaleDiskLoc) => {}
Ok(_) => panic!("expected StaleDiskLoc from second block, got Ok"),
Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
}
}
#[test]
fn retry_limit_returns_none_on_persistent_stale() {
let dir = tempdir().unwrap();
let tree = open_test_tree(dir.path());
let key = 99u64.to_be_bytes();
tree.put(&key, b"payload").expect("put");
let guard = tree.index.collector().enter();
let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
let snap = *node.load_disk();
drop(guard);
{
let shard = &tree.engine.shards()[snap.shard_id as usize];
let mut inner = shard.lock();
inner.immutable = Vec::new();
inner.active.file_id = u32::MAX;
}
let guard = tree.index.collector().enter();
let node = tree
.index
.get(key.as_bytes(), &guard)
.expect("still indexed");
assert!(
tree.read_value_cached(node, &guard).is_none(),
"MAX_STALE_RETRIES must terminate the loop and return None"
);
}
#[test]
fn var_tree_replay_init_fires_on_init_per_live_key_raw() {
let dir = tempdir().unwrap();
let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
for i in 0u64..5 {
tree.put(&i.to_be_bytes(), &[i as u8; 16]).expect("put");
}
tree.delete(&3u64.to_be_bytes()).expect("delete");
tree.hook.writes.store(0, AtomicOrdering::Relaxed);
tree.hook.inits.store(0, AtomicOrdering::Relaxed);
tree.replay_init();
assert_eq!(
tree.hook.inits.load(AtomicOrdering::Relaxed),
4,
"4 live keys"
);
assert_eq!(
tree.hook.writes.load(AtomicOrdering::Relaxed),
0,
"no on_write"
);
}
#[test]
fn var_tree_replay_init_no_hook_is_noop() {
let dir = tempdir().unwrap();
let tree = open_test_tree(dir.path());
for i in 0u64..3 {
tree.put(&i.to_be_bytes(), &[i as u8; 8]).expect("put");
}
tree.replay_init();
assert!(tree.get(&0u64.to_be_bytes()).is_some());
}
#[test]
fn var_tree_migrate_keep_fires_on_init_not_on_write_raw() {
use crate::MigrateAction;
let dir = tempdir().unwrap();
let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
for i in 0u64..4 {
tree.put(&i.to_be_bytes(), &[i as u8; 16]).expect("put");
}
tree.hook.writes.store(0, AtomicOrdering::Relaxed);
tree.hook.inits.store(0, AtomicOrdering::Relaxed);
let mutated = tree.migrate(|_, _| MigrateAction::Keep).expect("migrate");
assert_eq!(mutated, 0);
assert_eq!(
tree.hook.inits.load(AtomicOrdering::Relaxed),
4,
"4 keeps -> 4 on_init"
);
assert_eq!(
tree.hook.writes.load(AtomicOrdering::Relaxed),
0,
"Keep must not fire on_write"
);
}
#[test]
fn var_tree_migrate_update_fires_on_init_with_new_value_raw() {
use crate::MigrateAction;
let dir = tempdir().unwrap();
let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
let key = 42u64.to_be_bytes();
tree.put(&key, b"old-value").expect("put");
tree.hook.writes.store(0, AtomicOrdering::Relaxed);
tree.hook.inits.store(0, AtomicOrdering::Relaxed);
let new = ByteView::new(b"new-value");
let mutated = tree
.migrate(move |_, _| MigrateAction::Update(new.clone()))
.expect("migrate");
assert_eq!(mutated, 1);
assert_eq!(tree.hook.inits.load(AtomicOrdering::Relaxed), 1);
assert_eq!(
tree.hook.last_init_value.lock().as_deref(),
Some(b"new-value".as_ref()),
"on_init must receive the NEW value"
);
assert_eq!(
tree.hook.writes.load(AtomicOrdering::Relaxed),
0,
"Update must NOT fire on_write (was double-firing through self.put)"
);
assert_eq!(tree.get(&key).unwrap().as_bytes(), b"new-value");
}
#[test]
fn var_tree_migrate_delete_fires_no_hooks_raw() {
use crate::MigrateAction;
let dir = tempdir().unwrap();
let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
let key = 7u64.to_be_bytes();
tree.put(&key, b"x").expect("put");
tree.hook.writes.store(0, AtomicOrdering::Relaxed);
tree.hook.inits.store(0, AtomicOrdering::Relaxed);
let mutated = tree.migrate(|_, _| MigrateAction::Delete).expect("migrate");
assert_eq!(mutated, 1);
assert_eq!(tree.hook.inits.load(AtomicOrdering::Relaxed), 0);
assert_eq!(tree.hook.writes.load(AtomicOrdering::Relaxed), 0);
assert!(tree.get(&key).is_none());
}
#[test]
fn var_tree_migrate_no_init_hook_is_silent_for_keep_and_update() {
use crate::MigrateAction;
let dir = tempdir().unwrap();
let tree = open_test_tree_hooked::<false, false>(dir.path(), CountingHook::default());
for i in 0u64..3 {
tree.put(&i.to_be_bytes(), &[i as u8; 16]).expect("put");
}
tree.hook.writes.store(0, AtomicOrdering::Relaxed);
tree.hook.inits.store(0, AtomicOrdering::Relaxed);
tree.migrate(|_, _| MigrateAction::Keep)
.expect("migrate keep");
assert_eq!(
tree.hook.inits.load(AtomicOrdering::Relaxed),
0,
"Keep with NEEDS_INIT=false"
);
assert_eq!(tree.hook.writes.load(AtomicOrdering::Relaxed), 0);
let new = ByteView::new(b"new");
tree.migrate(move |_, _| MigrateAction::Update(new.clone()))
.expect("migrate update");
assert_eq!(
tree.hook.inits.load(AtomicOrdering::Relaxed),
0,
"Update with NEEDS_INIT=false"
);
assert_eq!(
tree.hook.writes.load(AtomicOrdering::Relaxed),
0,
"Update must not fire on_write either"
);
}
#[test]
fn var_tree_public_put_still_fires_on_write_once() {
let dir = tempdir().unwrap();
let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
tree.put(&1u64.to_be_bytes(), b"v").expect("put");
assert_eq!(tree.hook.writes.load(AtomicOrdering::Relaxed), 1);
}
#[test]
fn var_tree_atomic_does_not_fire_hooks() {
let dir = tempdir().unwrap();
let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
let key = 1u64.to_be_bytes();
tree.atomic(&key, |shard| {
shard.put(&key, b"a")?;
shard.delete(&key)?;
Ok(())
})
.expect("atomic");
assert_eq!(tree.hook.writes.load(AtomicOrdering::Relaxed), 0);
assert_eq!(tree.hook.inits.load(AtomicOrdering::Relaxed), 0);
}
}