use crate::Key;
use crate::compaction::{CompactionIndex, compact_shard};
use crate::config::Config;
use crate::disk_loc::DiskLoc;
use crate::durability::{Bitcask, Durability, DurabilityInner};
use crate::engine::Engine;
use crate::error::{DbError, DbResult};
use crate::hook::{NoHook, WriteHook};
use crate::key::Location;
use crate::recovery::recover_const_tree;
use crate::skiplist::node::{ConstNode, SkipNode, random_height};
use crate::skiplist::{InsertResult, SkipList};
use crate::sync::MutexGuard;
use std::mem::size_of;
use std::ops::Bound;
pub struct ConstTree<K: Key, const V: usize, H: WriteHook<K> = NoHook, D: Durability = Bitcask> {
index: SkipList<ConstNode<K, V, D::Loc>>,
durability: D,
shard_prefix_bits: usize,
reversed: bool,
hook: H,
}
impl<K: Key, const V: usize> ConstTree<K, V, NoHook, Bitcask> {
pub fn open(path: impl AsRef<std::path::Path>, config: Config) -> DbResult<Self> {
Self::open_inner(path, config, NoHook)
}
}
impl<K: Key, const V: usize, H: WriteHook<K>> ConstTree<K, V, H, Bitcask> {
pub fn open_hooked(
path: impl AsRef<std::path::Path>,
config: Config,
hook: H,
) -> DbResult<Self> {
Self::open_inner(path, config, hook)
}
fn open_inner(path: impl AsRef<std::path::Path>, config: Config, hook: H) -> DbResult<Self> {
let compaction_threshold = config.compaction_threshold;
let shard_prefix_bits = config.shard_prefix_bits;
let reversed = config.reversed;
let engine = Engine::open(path, config)?;
let durability = Bitcask {
engine,
compaction_threshold,
};
let tree = Self {
index: SkipList::new(reversed),
shard_prefix_bits,
reversed,
hook,
durability,
};
let shard_dirs = tree.durability.engine.shard_dirs();
let shard_dir_refs = Engine::shard_dir_refs(&shard_dirs);
let shard_ids = tree.durability.engine.shard_ids();
let hints = tree.durability.engine.hints();
let max_gsn = recover_const_tree::<K, V>(
&shard_dir_refs,
&shard_ids,
tree.index(),
hints,
#[cfg(feature = "encryption")]
tree.durability.engine.cipher(),
)?;
tree.durability
.engine
.gsn()
.fetch_max(max_gsn + 1, std::sync::atomic::Ordering::Relaxed);
if hints {
for shard in tree.durability.engine.shards().iter() {
shard.set_key_len(size_of::<K>());
}
}
tracing::info!(
key_size = size_of::<K>(),
V,
entries = tree.len(),
"const_tree recovered"
);
Ok(tree)
}
pub fn close(self) -> DbResult<()> {
if self.durability.engine.hints() {
self.sync_hints()?;
}
self.durability.engine.flush()
}
pub fn flush_buffers(&self) -> DbResult<()> {
self.durability.engine.flush_buffers()
}
pub fn config(&self) -> &Config {
self.durability.engine.config()
}
pub fn compact(&self) -> DbResult<usize> {
let mut total_compacted = 0;
for shard in self.durability.engine.shards().iter() {
total_compacted += compact_shard(shard, self, self.durability.compaction_threshold)?;
}
Ok(total_compacted)
}
pub fn sync_hints(&self) -> DbResult<()> {
for shard in self.durability.engine.shards().iter() {
shard.write_active_hint(size_of::<K>())?;
}
Ok(())
}
}
impl<K: Key, const V: usize, H: WriteHook<K>> CompactionIndex<K> for ConstTree<K, V, H, Bitcask> {
fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
let guard = self.index.collector().enter();
if let Some(node) = self.index.get(key.as_bytes(), &guard)
&& node.read_loc() == old_loc
{
node.write_loc(new_loc);
return true;
}
false
}
fn contains_key(&self, key: &K) -> bool {
self.contains(key)
}
}
use crate::durability::Fixed;
use crate::fixed::config::FixedConfig;
impl<K: Key, const V: usize> ConstTree<K, V, NoHook, Fixed> {
pub fn open(path: impl AsRef<std::path::Path>, config: FixedConfig) -> DbResult<Self> {
Self::open_fixed_inner(path, config, NoHook)
}
}
impl<K: Key, const V: usize, H: WriteHook<K>> ConstTree<K, V, H, Fixed> {
pub fn open_with_hook(
path: impl AsRef<std::path::Path>,
config: FixedConfig,
hook: H,
) -> DbResult<Self> {
Self::open_fixed_inner(path, config, hook)
}
fn open_fixed_inner(
path: impl AsRef<std::path::Path>,
config: FixedConfig,
hook: H,
) -> DbResult<Self> {
let shard_prefix_bits = config.shard_prefix_bits;
let dur = Fixed::open(path, config, size_of::<K>(), V)?;
let index = SkipList::new(false);
let total_recovered =
dur.recover_entries(|_shard_id, key_bytes, value_bytes, slot_id| {
let key = K::from_bytes(key_bytes);
let mut value = [0u8; V];
value.copy_from_slice(value_bytes);
let height = random_height();
let node = ConstNode::<K, V, u32>::alloc(key, value, slot_id, height);
let guard = index.collector().enter();
let _ = index.insert(node, &guard);
})?;
tracing::info!(
key_size = size_of::<K>(),
V,
entries = total_recovered,
"fixed_tree recovered"
);
Ok(Self {
index,
durability: dur,
shard_prefix_bits,
reversed: false,
hook,
})
}
pub fn close(self) -> DbResult<()> {
self.durability.close()
}
}
#[cfg(feature = "replication")]
impl<K: Key, const V: usize, H: WriteHook<K>> ConstTree<K, V, H, Fixed> {
pub(crate) fn fixed_durability(&self) -> &Fixed {
&self.durability
}
pub fn fixed_engine_access(
&self,
) -> std::sync::Arc<dyn crate::fixed_replication::FixedEngineAccess> {
self.durability.engine.clone()
}
pub(crate) fn get_slot_id(&self, key: &K) -> Option<u32> {
let guard = self.index.collector().enter();
let node = self.index.get(key.as_bytes(), &guard)?;
Some(node.read_loc())
}
pub(crate) fn remove_key_if_slot_matches(&self, key: &K, slot_id: u32) -> bool {
let guard = self.index.collector().enter();
let node = match self.index.get(key.as_bytes(), &guard) {
Some(n) => n,
None => return false,
};
if node.read_loc() != slot_id {
return false;
}
self.index.remove(key.as_bytes(), &guard);
true
}
pub(crate) fn upsert_replicated(&self, key: &K, value: [u8; V], slot_id: u32) {
let guard = self.index.collector().enter();
if let Some(existing) = self.index.get(key.as_bytes(), &guard) {
if existing.read_loc() == slot_id {
existing.write_data(slot_id, &value);
return;
}
self.index.remove(key.as_bytes(), &guard);
}
let height = random_height();
let node_ptr = ConstNode::<K, V, u32>::alloc(*key, value, slot_id, height);
match self.index.insert(node_ptr, &guard) {
InsertResult::Inserted => {}
InsertResult::Exists(existing) => {
debug_assert!(
false,
"upsert_replicated: unexpected concurrent insert for key"
);
existing.write_data(slot_id, &value);
unsafe {
ConstNode::<K, V, u32>::dealloc_node(node_ptr);
}
}
}
}
}
impl<K: Key, const V: usize, H: WriteHook<K>, D: Durability> ConstTree<K, V, H, D> {
pub fn get(&self, key: &K) -> Option<[u8; V]> {
metrics::counter!("armdb.ops", "op" => "get", "tree" => "const_tree").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("const_tree.get");
let guard = self.index.collector().enter();
let node = self.index.get(key.as_bytes(), &guard)?;
Some(node.read_value())
}
pub fn get_or_err(&self, key: &K) -> DbResult<[u8; V]> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn put(&self, key: &K, value: &[u8; V]) -> DbResult<Option<[u8; V]>> {
metrics::counter!("armdb.ops", "op" => "put", "tree" => "const_tree").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("const_tree.put");
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let guard = self.index.collector().enter();
let old = self.put_locked(shard_id, &mut *inner, &guard, key, value)?;
let needs_sync = inner.should_sync();
drop(inner);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
self.hook
.on_write(key, old.as_ref().map(|v| &v[..]), Some(&value[..]));
Ok(old)
}
pub fn insert(&self, key: &K, value: &[u8; V]) -> DbResult<()> {
metrics::counter!("armdb.ops", "op" => "insert", "tree" => "const_tree").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("const_tree.insert");
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let guard = self.index.collector().enter();
self.insert_locked(shard_id, &mut *inner, &guard, key, value)?;
let needs_sync = inner.should_sync();
drop(inner);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
self.hook.on_write(key, None, Some(&value[..]));
Ok(())
}
pub fn delete(&self, key: &K) -> DbResult<Option<[u8; V]>> {
metrics::counter!("armdb.ops", "op" => "delete", "tree" => "const_tree").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("const_tree.delete");
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let guard = self.index.collector().enter();
let old = self.delete_locked(shard_id, &mut *inner, &guard, key)?;
let needs_sync = inner.should_sync();
drop(inner);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
if let Some(ref old_val) = old {
self.hook.on_write(key, Some(&old_val[..]), None);
}
Ok(old)
}
pub fn atomic<R>(
&self,
shard_key: &K,
f: impl FnOnce(&mut ConstShard<'_, K, V, H, D>) -> DbResult<R>,
) -> DbResult<R> {
let shard_id = self.shard_for(shard_key);
let inner = self.durability.lock_shard(shard_id);
let guard = self.index.collector().enter();
let mut shard = ConstShard {
tree: self,
inner,
shard_id,
guard,
};
f(&mut shard)
}
fn put_locked(
&self,
shard_id: usize,
inner: &mut D::Inner,
guard: &seize::LocalGuard<'_>,
key: &K,
value: &[u8; V],
) -> DbResult<Option<[u8; V]>> {
if let Some(existing) = self.index.get(key.as_bytes(), guard) {
let old_value = existing.read_value();
let old_loc = existing.read_loc();
let new_loc = inner.write_update(shard_id as u8, old_loc, key.as_bytes(), value)?;
existing.write_data(new_loc, value);
return Ok(Some(old_value));
}
let loc = inner.write_new(shard_id as u8, key.as_bytes(), value)?;
let height = random_height();
let node_ptr = ConstNode::<K, V, D::Loc>::alloc(*key, *value, loc, height);
match self.index.insert(node_ptr, guard) {
InsertResult::Inserted => Ok(None),
InsertResult::Exists(existing) => {
inner.write_discard(loc)?;
let old_value = existing.read_value();
let old_loc = existing.read_loc();
let new_loc = inner.write_update(shard_id as u8, old_loc, key.as_bytes(), value)?;
existing.write_data(new_loc, value);
unsafe {
ConstNode::<K, V, D::Loc>::dealloc_node(node_ptr);
}
Ok(Some(old_value))
}
}
}
fn insert_locked(
&self,
shard_id: usize,
inner: &mut D::Inner,
guard: &seize::LocalGuard<'_>,
key: &K,
value: &[u8; V],
) -> DbResult<()> {
if self.index.get(key.as_bytes(), guard).is_some() {
return Err(DbError::KeyExists);
}
let loc = inner.write_new(shard_id as u8, key.as_bytes(), value)?;
let height = random_height();
let node_ptr = ConstNode::<K, V, D::Loc>::alloc(*key, *value, loc, height);
match self.index.insert(node_ptr, guard) {
InsertResult::Inserted => Ok(()),
InsertResult::Exists(_) => {
inner.write_discard(loc)?;
unsafe {
ConstNode::<K, V, D::Loc>::dealloc_node(node_ptr);
}
Err(DbError::KeyExists)
}
}
}
fn delete_locked(
&self,
shard_id: usize,
inner: &mut D::Inner,
guard: &seize::LocalGuard<'_>,
key: &K,
) -> DbResult<Option<[u8; V]>> {
if let Some(existing) = self.index.get(key.as_bytes(), guard) {
let old_value = existing.read_value();
let old_loc = existing.read_loc();
inner.write_tombstone(shard_id as u8, old_loc, key.as_bytes())?;
self.index.remove(key.as_bytes(), guard);
return Ok(Some(old_value));
}
Ok(None)
}
fn put_no_hook(&self, key: &K, value: &[u8; V]) -> DbResult<Option<[u8; V]>> {
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let guard = self.index.collector().enter();
self.put_locked(shard_id, &mut *inner, &guard, key, value)
}
fn delete_no_hook(&self, key: &K) -> DbResult<Option<[u8; V]>> {
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let guard = self.index.collector().enter();
self.delete_locked(shard_id, &mut *inner, &guard, key)
}
pub fn cas(&self, key: &K, expected: &[u8; V], new_value: &[u8; V]) -> DbResult<()> {
metrics::counter!("armdb.ops", "op" => "cas", "tree" => "const_tree").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("const_tree.cas");
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let guard = self.index.collector().enter();
let existing = self
.index
.get(key.as_bytes(), &guard)
.ok_or(DbError::KeyNotFound)?;
let current_value = existing.read_value();
if current_value != *expected {
return Err(DbError::CasMismatch);
}
let old_loc = existing.read_loc();
let new_loc = inner.write_update(shard_id as u8, old_loc, key.as_bytes(), new_value)?;
existing.write_data(new_loc, new_value);
let needs_sync = inner.should_sync();
drop(inner);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
self.hook
.on_write(key, Some(&expected[..]), Some(&new_value[..]));
Ok(())
}
pub fn update(
&self,
key: &K,
f: impl FnOnce(&[u8; V]) -> [u8; V],
) -> DbResult<Option<[u8; V]>> {
self.update_inner(key, f, false)
}
pub fn fetch_update(
&self,
key: &K,
f: impl FnOnce(&[u8; V]) -> [u8; V],
) -> DbResult<Option<[u8; V]>> {
self.update_inner(key, f, true)
}
fn update_inner(
&self,
key: &K,
f: impl FnOnce(&[u8; V]) -> [u8; V],
return_old: bool,
) -> DbResult<Option<[u8; V]>> {
metrics::counter!("armdb.ops", "op" => "update", "tree" => "const_tree").increment(1);
#[cfg(feature = "hot-path-tracing")]
tracing::trace!("const_tree.update");
let shard_id = self.shard_for(key);
let mut inner = self.durability.lock_shard(shard_id);
let guard = self.index.collector().enter();
let existing = match self.index.get(key.as_bytes(), &guard) {
Some(n) => n,
None => return Ok(None),
};
let old_value = existing.read_value();
let new_value = f(&old_value);
let old_loc = existing.read_loc();
let new_loc = inner.write_update(shard_id as u8, old_loc, key.as_bytes(), &new_value)?;
existing.write_data(new_loc, &new_value);
let needs_sync = inner.should_sync();
drop(inner);
if needs_sync {
self.durability.lock_shard(shard_id).sync()?;
}
self.hook
.on_write(key, Some(&old_value[..]), Some(&new_value[..]));
Ok(Some(if return_old { old_value } else { new_value }))
}
pub fn contains(&self, key: &K) -> bool {
self.get(key).is_some()
}
pub fn first(&self) -> Option<(K, [u8; V])> {
let guard = self.index.collector().enter();
let mut ptr = crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
});
while !ptr.is_null() {
let node = unsafe { &*ptr };
if !node.is_marked() {
return Some((node.key, node.read_value()));
}
ptr = crate::skiplist::strip_mark(
node.tower(0).load(std::sync::atomic::Ordering::Acquire),
);
}
let _ = guard;
None
}
pub fn last(&self) -> Option<(K, [u8; V])> {
self.iter().next_back()
}
fn resolve_front_asc(
&self,
bound: &Bound<&K>,
guard: &seize::LocalGuard<'_>,
) -> *mut ConstNode<K, V, D::Loc> {
match bound {
Bound::Included(k) => self.index.find_first_ge(k.as_bytes(), guard),
Bound::Excluded(k) => {
let ge = self.index.find_first_ge(k.as_bytes(), guard);
if !ge.is_null()
&& !unsafe { &*ge }.is_marked()
&& unsafe { &*ge }.key_bytes() == k.as_bytes()
{
crate::skiplist::strip_mark(unsafe {
(*ge).tower(0).load(std::sync::atomic::Ordering::Acquire)
})
} else {
ge
}
}
Bound::Unbounded => crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
}),
}
}
fn resolve_front_rev(
&self,
bound: &Bound<&K>,
guard: &seize::LocalGuard<'_>,
) -> *mut ConstNode<K, V, D::Loc> {
match bound {
Bound::Included(k) => self.index.find_first_ge(k.as_bytes(), guard),
Bound::Excluded(k) => {
let ge = self.index.find_first_ge(k.as_bytes(), guard);
if !ge.is_null()
&& !unsafe { &*ge }.is_marked()
&& unsafe { &*ge }.key_bytes() == k.as_bytes()
{
crate::skiplist::strip_mark(unsafe {
(*ge).tower(0).load(std::sync::atomic::Ordering::Acquire)
})
} else {
ge
}
}
Bound::Unbounded => crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
}),
}
}
fn prefix_bounds(&self, prefix: &[u8]) -> (K, Bound<K>) {
if self.reversed {
let mut search = K::zeroed();
search.as_bytes_mut().fill(0xFF);
search.as_bytes_mut()[..prefix.len()].copy_from_slice(prefix);
let mut end_key = K::zeroed();
end_key.as_bytes_mut()[..prefix.len()].copy_from_slice(prefix);
(search, Bound::Included(end_key))
} else {
let mut search = K::zeroed();
search.as_bytes_mut()[..prefix.len()].copy_from_slice(prefix);
let end = prefix_to_end_bound::<K>(prefix);
(search, end)
}
}
pub fn prefix_iter(&self, prefix: &[u8]) -> ConstIter<'_, K, V, D::Loc> {
let guard = self.index.collector().enter();
let (search_key, end) = self.prefix_bounds(prefix);
let front = self.index.find_first_ge(search_key.as_bytes(), &guard);
ConstIter {
list: &self.index,
front,
back: None,
end,
start: Bound::Included(search_key),
reversed: self.reversed,
done: false,
_guard: guard,
}
}
pub fn iter(&self) -> ConstIter<'_, K, V, D::Loc> {
let guard = self.index.collector().enter();
let front = crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
});
ConstIter {
list: &self.index,
front,
back: None,
end: Bound::Unbounded,
start: Bound::Unbounded,
reversed: self.reversed,
done: false,
_guard: guard,
}
}
pub fn range(&self, start: &K, end: &K) -> ConstIter<'_, K, V, D::Loc> {
self.range_bounds(Bound::Included(start), Bound::Excluded(end))
}
pub fn range_bounds(&self, start: Bound<&K>, end: Bound<&K>) -> ConstIter<'_, K, V, D::Loc> {
let guard = self.index.collector().enter();
if self.reversed {
let front = self.resolve_front_rev(&end, &guard);
ConstIter {
list: &self.index,
front,
back: None,
end: bound_owned(&start),
start: bound_owned(&end),
reversed: true,
done: false,
_guard: guard,
}
} else {
let front = self.resolve_front_asc(&start, &guard);
ConstIter {
list: &self.index,
front,
back: None,
end: bound_owned(&end),
start: bound_owned(&start),
reversed: false,
done: false,
_guard: guard,
}
}
}
pub fn len(&self) -> usize {
self.index.len()
}
pub fn is_empty(&self) -> bool {
self.index.is_empty()
}
pub fn migrate(
&self,
f: impl Fn(&K, &[u8; V]) -> crate::MigrateAction<[u8; V]>,
) -> DbResult<usize> {
use crate::MigrateAction;
let _guard = self.index.collector().enter();
let mut current = crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
});
let mut count = 0;
while !current.is_null() {
let node = unsafe { &*current };
current = crate::skiplist::strip_mark(
node.tower(0).load(std::sync::atomic::Ordering::Acquire),
);
if node.is_marked() {
continue;
}
let value = node.read_value();
match f(&node.key, &value) {
MigrateAction::Keep => {
if H::NEEDS_INIT {
self.hook.on_init(&node.key, &value[..]);
}
}
MigrateAction::Update(value) => {
if H::NEEDS_INIT {
self.hook.on_init(&node.key, &value[..]);
}
self.put_no_hook(&node.key, &value)?;
count += 1;
}
MigrateAction::Delete => {
self.delete_no_hook(&node.key)?;
count += 1;
}
}
}
tracing::info!(mutations = count, "const_tree migration complete");
Ok(count)
}
pub(crate) fn replay_init(&self) {
if !H::NEEDS_INIT {
return;
}
let _guard = self.index.collector().enter();
let mut current = crate::skiplist::strip_mark(unsafe {
(*self.index.head_ptr())
.tower(0)
.load(std::sync::atomic::Ordering::Acquire)
});
while !current.is_null() {
let node = unsafe { &*current };
current = crate::skiplist::strip_mark(
node.tower(0).load(std::sync::atomic::Ordering::Acquire),
);
if !node.is_marked() {
self.hook.on_init(&node.key, &node.read_value()[..]);
}
}
}
pub(crate) fn index(&self) -> &SkipList<ConstNode<K, V, D::Loc>> {
&self.index
}
pub fn shard_for(&self, key: &K) -> usize {
if self.shard_prefix_bits == 0 || self.shard_prefix_bits >= size_of::<K>() * 8 {
let hash = xxhash_rust::xxh3::xxh3_64(key.as_bytes());
return (hash as usize) % self.durability.shard_count();
}
let full_bytes = self.shard_prefix_bits / 8;
let extra_bits = self.shard_prefix_bits % 8;
let hash = if extra_bits == 0 {
xxhash_rust::xxh3::xxh3_64(&key.as_bytes()[..full_bytes])
} else {
let mut buf = K::zeroed();
buf.as_bytes_mut()[..full_bytes + 1].copy_from_slice(&key.as_bytes()[..full_bytes + 1]);
let mask = !((1u8 << (8 - extra_bits)) - 1);
buf.as_bytes_mut()[full_bytes] = key.as_bytes()[full_bytes] & mask;
xxhash_rust::xxh3::xxh3_64(&buf.as_bytes()[..full_bytes + 1])
};
(hash as usize) % self.durability.shard_count()
}
pub fn flush(&self) -> DbResult<()> {
self.durability.flush()
}
}
#[cfg(feature = "replication")]
impl<K: Key, const V: usize, H: WriteHook<K>> crate::replication::ReplicationTarget
for ConstTree<K, V, H, Bitcask>
{
fn apply_entry(
&self,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
key: &[u8],
value: &[u8],
) -> DbResult<()> {
let key: K = K::from_bytes(key);
let value_offset =
entry_offset + size_of::<crate::entry::EntryHeader>() as u64 + size_of::<K>() as u64;
let disk = DiskLoc::new(
shard_id,
file_id as u16,
value_offset as u32,
header.value_len,
);
if header.is_tombstone() {
let guard = self.index.collector().enter();
self.index.remove(key.as_bytes(), &guard);
} else {
let value: [u8; V] = value.try_into().map_err(|_| DbError::CorruptedEntry {
offset: entry_offset,
})?;
let guard = self.index.collector().enter();
let height = random_height();
let node_ptr = ConstNode::alloc(key, value, disk, height);
match self.index.insert(node_ptr, &guard) {
InsertResult::Inserted => {}
InsertResult::Exists(existing) => {
existing.write_data(disk, &value);
unsafe {
ConstNode::<K, V>::dealloc_node(node_ptr);
}
}
}
}
Ok(())
}
fn try_apply_entry(
&self,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
raw_after_header: &[u8],
) -> DbResult<bool> {
if raw_after_header.len() < size_of::<K>() + header.value_len as usize {
return Ok(false);
}
let key = &raw_after_header[..size_of::<K>()];
let value = &raw_after_header[size_of::<K>()..size_of::<K>() + header.value_len as usize];
let crc = crate::entry::compute_crc32(header.gsn, header.value_len, key, value);
if crc != header.crc32 {
return Ok(false);
}
self.apply_entry(shard_id, file_id, entry_offset, header, key, value)?;
Ok(true)
}
fn key_len(&self) -> usize {
size_of::<K>()
}
}
pub struct ConstShard<'a, K: Key, const V: usize, H: WriteHook<K> = NoHook, D: Durability = Bitcask>
{
tree: &'a ConstTree<K, V, H, D>,
inner: MutexGuard<'a, D::Inner>,
shard_id: usize,
guard: seize::LocalGuard<'a>,
}
impl<K: Key, const V: usize, H: WriteHook<K>, D: Durability> ConstShard<'_, K, V, H, D> {
pub fn put(&mut self, key: &K, value: &[u8; V]) -> DbResult<Option<[u8; V]>> {
self.check_shard(key)?;
self.tree
.put_locked(self.shard_id, &mut *self.inner, &self.guard, key, value)
}
pub fn insert(&mut self, key: &K, value: &[u8; V]) -> DbResult<()> {
self.check_shard(key)?;
self.tree
.insert_locked(self.shard_id, &mut *self.inner, &self.guard, key, value)
}
pub fn delete(&mut self, key: &K) -> DbResult<Option<[u8; V]>> {
self.check_shard(key)?;
self.tree
.delete_locked(self.shard_id, &mut *self.inner, &self.guard, key)
}
pub fn get(&self, key: &K) -> Option<[u8; V]> {
let node = self.tree.index.get(key.as_bytes(), &self.guard)?;
Some(node.read_value())
}
pub fn get_or_err(&self, key: &K) -> DbResult<[u8; V]> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.tree.index.get(key.as_bytes(), &self.guard).is_some()
}
fn check_shard(&self, key: &K) -> DbResult<()> {
if self.tree.shard_for(key) != self.shard_id {
return Err(DbError::ShardMismatch);
}
Ok(())
}
}
fn bound_owned<K: Copy>(b: &Bound<&K>) -> Bound<K> {
match b {
Bound::Included(k) => Bound::Included(**k),
Bound::Excluded(k) => Bound::Excluded(**k),
Bound::Unbounded => Bound::Unbounded,
}
}
fn prefix_to_end_bound<K: Key>(prefix: &[u8]) -> Bound<K> {
let mut incremented = prefix.to_vec();
let mut carry = true;
for byte in incremented.iter_mut().rev() {
if carry {
if *byte == 0xFF {
*byte = 0x00;
} else {
*byte += 1;
carry = false;
break;
}
}
}
if carry {
Bound::Unbounded
} else {
let mut end = K::zeroed();
end.as_bytes_mut()[..incremented.len()].copy_from_slice(&incremented);
Bound::Excluded(end)
}
}
pub struct ConstIter<'a, K: Key, const V: usize, L: Location = DiskLoc> {
list: &'a SkipList<ConstNode<K, V, L>>,
front: *mut ConstNode<K, V, L>,
back: Option<*mut ConstNode<K, V, L>>,
end: Bound<K>,
start: Bound<K>,
reversed: bool,
done: bool,
_guard: seize::LocalGuard<'a>,
}
unsafe impl<K: Key, const V: usize, L: Location> Send for ConstIter<'_, K, V, L> {}
impl<K: Key, const V: usize, L: Location> Iterator for ConstIter<'_, K, V, L> {
type Item = (K, [u8; V]);
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.done || self.front.is_null() {
return None;
}
let node = unsafe { &*self.front };
let converged = self.back.is_some_and(|back| std::ptr::eq(self.front, back));
self.front = crate::skiplist::strip_mark(
node.tower(0).load(std::sync::atomic::Ordering::Acquire),
);
if converged {
self.done = true;
}
if node.is_marked() {
if converged {
return None;
}
continue;
}
if !self.check_end(&node.key) {
self.done = true;
return None;
}
return Some((node.key, node.read_value()));
}
}
}
impl<K: Key, const V: usize, L: Location> DoubleEndedIterator for ConstIter<'_, K, V, L> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.back.is_none() {
self.back = Some(self.resolve_back());
if self.front.is_null() {
self.done = true;
}
}
loop {
let back = self.back.unwrap_or(std::ptr::null_mut());
if self.done || back.is_null() {
return None;
}
let node = unsafe { &*back };
let key = node.key;
let converged = std::ptr::eq(self.front, back);
self.back = Some(self.list.find_last_lt(key.as_bytes()));
if converged {
self.done = true;
}
if node.is_marked() {
if converged {
return None;
}
continue;
}
if !self.check_start(&key) {
self.done = true;
return None;
}
return Some((key, node.read_value()));
}
}
}
impl<K: Key, const V: usize, L: Location> ConstIter<'_, K, V, L> {
fn resolve_back(&self) -> *mut ConstNode<K, V, L> {
match &self.end {
Bound::Unbounded => self.list.find_last(),
Bound::Excluded(k) => self.list.find_last_lt(k.as_bytes()),
Bound::Included(k) => {
let ge = self.list.find_first_ge(k.as_bytes(), &self._guard);
if !ge.is_null()
&& !unsafe { &*ge }.is_marked()
&& unsafe { &*ge }.key_bytes() == k.as_bytes()
{
ge
} else {
self.list.find_last_lt(k.as_bytes())
}
}
}
}
#[inline(always)]
fn check_end(&self, key: &K) -> bool {
match &self.end {
Bound::Unbounded => true,
Bound::Excluded(end) => {
if self.reversed {
key.as_bytes() > end.as_bytes()
} else {
key.as_bytes() < end.as_bytes()
}
}
Bound::Included(end) => {
if self.reversed {
key.as_bytes() >= end.as_bytes()
} else {
key.as_bytes() <= end.as_bytes()
}
}
}
}
#[inline(always)]
fn check_start(&self, key: &K) -> bool {
match &self.start {
Bound::Unbounded => true,
Bound::Excluded(s) => {
if self.reversed {
key.as_bytes() < s.as_bytes()
} else {
key.as_bytes() > s.as_bytes()
}
}
Bound::Included(s) => {
if self.reversed {
key.as_bytes() <= s.as_bytes()
} else {
key.as_bytes() >= s.as_bytes()
}
}
}
}
}
impl<K: Key, const V: usize, L: Location> ConstIter<'_, K, V, L> {
pub fn collect_vec(&mut self) -> Vec<(K, [u8; V])> {
self.collect()
}
}
#[cfg(all(test, feature = "replication"))]
mod replication_helper_tests {
use crate::FixedConfig;
use crate::fixed::FixedTree;
use tempfile::tempdir;
fn cfg() -> FixedConfig {
FixedConfig {
shard_count: 2,
grow_step: 64,
..FixedConfig::test()
}
}
#[test]
fn get_slot_id_present_and_absent() {
let dir = tempdir().unwrap();
let tree = FixedTree::<[u8; 8], 8>::open(dir.path(), cfg()).unwrap();
let key = 1u64.to_be_bytes();
let value = 42u64.to_be_bytes();
tree.put(&key, &value).unwrap();
let slot = tree.get_slot_id(&key).expect("present key must resolve");
assert_eq!(tree.get_slot_id(&key), Some(slot));
let missing = 9999u64.to_be_bytes();
assert_eq!(tree.get_slot_id(&missing), None);
}
#[test]
fn remove_key_if_slot_matches_matching_and_nonmatching() {
let dir = tempdir().unwrap();
let tree = FixedTree::<[u8; 8], 8>::open(dir.path(), cfg()).unwrap();
let key = 7u64.to_be_bytes();
let value = 7u64.to_be_bytes();
tree.put(&key, &value).unwrap();
let slot = tree.get_slot_id(&key).unwrap();
assert!(!tree.remove_key_if_slot_matches(&key, slot.wrapping_add(1)));
assert!(tree.contains(&key));
assert!(tree.remove_key_if_slot_matches(&key, slot));
assert!(!tree.contains(&key));
assert!(!tree.remove_key_if_slot_matches(&key, slot));
}
#[test]
fn upsert_replicated_insert_and_update() {
let dir = tempdir().unwrap();
let tree = FixedTree::<[u8; 8], 8>::open(dir.path(), cfg()).unwrap();
let key = 3u64.to_be_bytes();
let value_a = 100u64.to_be_bytes();
let value_b = 200u64.to_be_bytes();
tree.upsert_replicated(&key, value_a, 77);
assert_eq!(tree.get(&key), Some(value_a));
assert_eq!(tree.get_slot_id(&key), Some(77));
tree.upsert_replicated(&key, value_b, 77);
assert_eq!(tree.get(&key), Some(value_b));
assert_eq!(tree.get_slot_id(&key), Some(77));
let value_c = 300u64.to_be_bytes();
tree.upsert_replicated(&key, value_c, 123);
assert_eq!(tree.get(&key), Some(value_c));
assert_eq!(tree.get_slot_id(&key), Some(123));
}
}