use std::marker::PhantomData;
use std::ops::Bound;
use crate::Key;
use crate::byte_view::ByteView;
use crate::codec::Codec;
use crate::compaction::CompactionIndex;
use crate::config::Config;
use crate::disk_loc::DiskLoc;
use crate::error::{DbError, DbResult};
use crate::hook::{NoHook, TypedWriteHook, VarTypedHookAdapter};
use crate::var_tree::{VarIter, VarShard, VarTree};
pub struct VarTypedTree<
K: Key,
T: Send + Sync,
C: Codec<T> + Clone,
H: TypedWriteHook<K, T> = NoHook,
> {
inner: VarTree<K, VarTypedHookAdapter<K, T, C, H>>,
codec: C,
_marker: PhantomData<fn() -> T>,
}
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone> VarTypedTree<K, T, C> {
pub fn open(path: impl AsRef<std::path::Path>, config: Config, codec: C) -> DbResult<Self> {
Self::open_hooked_inner(path, config, codec, NoHook)
}
}
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
VarTypedTree<K, T, C, H>
{
pub fn open_hooked(
path: impl AsRef<std::path::Path>,
config: Config,
codec: C,
hook: H,
) -> DbResult<Self> {
Self::open_hooked_inner(path, config, codec, hook)
}
fn open_hooked_inner(
path: impl AsRef<std::path::Path>,
config: Config,
codec: C,
hook: H,
) -> DbResult<Self> {
let adapter = VarTypedHookAdapter {
inner: hook,
codec: codec.clone(),
_marker: PhantomData,
};
let inner = VarTree::open_hooked(path, config, adapter)?;
Ok(Self {
inner,
codec,
_marker: PhantomData,
})
}
pub fn close(self) -> DbResult<()> {
self.inner.close()
}
pub fn flush_buffers(&self) -> DbResult<()> {
self.inner.flush_buffers()
}
pub fn config(&self) -> &Config {
self.inner.config()
}
pub fn compact(&self) -> DbResult<usize> {
self.inner.compact()
}
pub fn sync_hints(&self) -> DbResult<()> {
self.inner.sync_hints()
}
pub fn warmup(&self) -> DbResult<()> {
self.inner.warmup()
}
pub fn as_inner(&self) -> &VarTree<K, VarTypedHookAdapter<K, T, C, H>> {
&self.inner
}
pub fn codec(&self) -> &C {
&self.codec
}
pub fn get(&self, key: &K) -> Option<T> {
let bytes = self.inner.get(key)?;
self.codec.decode_from(&bytes).ok()
}
pub fn get_or_err(&self, key: &K) -> DbResult<T> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.inner.contains(key)
}
pub fn first(&self) -> Option<(K, T)> {
let (k, bytes) = self.inner.first()?;
let v = self.codec.decode_from(&bytes).ok()?;
Some((k, v))
}
pub fn last(&self) -> Option<(K, T)> {
let (k, bytes) = self.inner.last()?;
let v = self.codec.decode_from(&bytes).ok()?;
Some((k, v))
}
pub fn put(&self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.codec.encode_to(value, &mut buf);
self.inner.put(key, &buf)
}
pub fn insert(&self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.codec.encode_to(value, &mut buf);
self.inner.insert(key, &buf)
}
pub fn delete(&self, key: &K) -> DbResult<bool> {
self.inner.delete(key)
}
pub fn cas(&self, key: &K, expected: &T, new_value: &T) -> DbResult<()> {
let mut exp_buf = Vec::new();
self.codec.encode_to(expected, &mut exp_buf);
let mut new_buf = Vec::new();
self.codec.encode_to(new_value, &mut new_buf);
self.inner.cas(key, &exp_buf, &new_buf)
}
pub fn update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<T>> {
use std::cell::Cell;
let out: Cell<Option<T>> = Cell::new(None);
let result = self.inner.update(key, |bytes| {
let Ok(current) = self.codec.decode_from(bytes) else {
return ByteView::new(bytes);
};
let new_val = f(¤t);
let mut buf = Vec::new();
self.codec.encode_to(&new_val, &mut buf);
out.set(Some(new_val));
ByteView::from_vec(buf)
})?;
if result.is_none() {
return Ok(None);
}
Ok(out.into_inner())
}
pub fn fetch_update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<T>> {
use std::cell::Cell;
let out: Cell<Option<T>> = Cell::new(None);
let result = self.inner.fetch_update(key, |bytes| {
let Ok(current) = self.codec.decode_from(bytes) else {
return ByteView::new(bytes);
};
let new_val = f(¤t);
let mut buf = Vec::new();
self.codec.encode_to(&new_val, &mut buf);
out.set(Some(current));
ByteView::from_vec(buf)
})?;
if result.is_none() {
return Ok(None);
}
Ok(out.into_inner())
}
pub fn atomic<R>(
&self,
shard_key: &K,
f: impl FnOnce(&mut VarTypedShard<'_, K, T, C, H>) -> DbResult<R>,
) -> DbResult<R> {
self.inner.atomic(shard_key, |var_shard| {
let inner_ptr: *mut () = (var_shard as *mut VarShard<'_, _, _>).cast();
let mut shard = VarTypedShard {
tree: self,
inner: inner_ptr,
_marker: PhantomData,
};
f(&mut shard)
})
}
pub fn prefix_iter(&self, prefix: &[u8]) -> VarTypedIter<'_, K, T, C, H> {
VarTypedIter {
inner: self.inner.prefix_iter(prefix),
codec: &self.codec,
_marker: PhantomData,
}
}
pub fn iter(&self) -> VarTypedIter<'_, K, T, C, H> {
VarTypedIter {
inner: self.inner.iter(),
codec: &self.codec,
_marker: PhantomData,
}
}
pub fn range(&self, start: &K, end: &K) -> VarTypedIter<'_, K, T, C, H> {
VarTypedIter {
inner: self.inner.range(start, end),
codec: &self.codec,
_marker: PhantomData,
}
}
pub fn range_bounds(&self, start: Bound<&K>, end: Bound<&K>) -> VarTypedIter<'_, K, T, C, H> {
VarTypedIter {
inner: self.inner.range_bounds(start, end),
codec: &self.codec,
_marker: PhantomData,
}
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn shard_for(&self, key: &K) -> usize {
self.inner.shard_for(key)
}
pub fn migrate(&self, f: impl Fn(&K, &T) -> crate::MigrateAction<T>) -> DbResult<usize> {
use crate::MigrateAction;
self.inner
.migrate(|key, bytes| match self.codec.decode_from(bytes) {
Ok(current) => match f(key, ¤t) {
MigrateAction::Keep => MigrateAction::Keep,
MigrateAction::Update(new) => {
let mut buf = Vec::new();
self.codec.encode_to(&new, &mut buf);
MigrateAction::Update(ByteView::from_vec(buf))
}
MigrateAction::Delete => MigrateAction::Delete,
},
Err(_) => {
tracing::warn!("var_typed_tree migrate: decode error, keeping entry");
MigrateAction::Keep
}
})
}
}
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>> CompactionIndex<K>
for VarTypedTree<K, T, C, H>
{
fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
self.inner.update_if_match(key, old_loc, new_loc)
}
fn invalidate_blocks(&self, shard_id: u8, file_id: u32, total_bytes: u64) {
self.inner.invalidate_blocks(shard_id, file_id, total_bytes);
}
fn contains_key(&self, key: &K) -> bool {
self.inner.contains(key)
}
}
#[cfg(feature = "replication")]
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
crate::replication::ReplicationTarget for VarTypedTree<K, T, C, H>
{
fn apply_entry(
&self,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
key: &[u8],
value: &[u8],
) -> DbResult<()> {
self.inner
.apply_entry(shard_id, file_id, entry_offset, header, key, value)
}
fn try_apply_entry(
&self,
shard_id: u8,
file_id: u32,
entry_offset: u64,
header: &crate::entry::EntryHeader,
raw_after_header: &[u8],
) -> DbResult<bool> {
self.inner
.try_apply_entry(shard_id, file_id, entry_offset, header, raw_after_header)
}
fn key_len(&self) -> usize {
self.inner.key_len()
}
}
pub struct VarTypedIter<'a, K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>> {
inner: VarIter<'a, K, VarTypedHookAdapter<K, T, C, H>>,
codec: &'a C,
_marker: PhantomData<fn() -> T>,
}
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>> Iterator
for VarTypedIter<'_, K, T, C, H>
{
type Item = (K, T);
fn next(&mut self) -> Option<Self::Item> {
for (k, bytes) in self.inner.by_ref() {
if let Ok(v) = self.codec.decode_from(&bytes) {
return Some((k, v));
}
}
None
}
}
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>> DoubleEndedIterator
for VarTypedIter<'_, K, T, C, H>
{
fn next_back(&mut self) -> Option<Self::Item> {
while let Some((k, bytes)) = self.inner.next_back() {
if let Ok(v) = self.codec.decode_from(&bytes) {
return Some((k, v));
}
}
None
}
}
pub struct VarTypedShard<
'tree,
K: Key,
T: Send + Sync,
C: Codec<T> + Clone,
H: TypedWriteHook<K, T>,
> {
tree: &'tree VarTypedTree<K, T, C, H>,
inner: *mut (),
_marker: PhantomData<&'tree mut ()>,
}
unsafe impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>> Send
for VarTypedShard<'_, K, T, C, H>
{
}
impl<K: Key, T: Send + Sync, C: Codec<T> + Clone, H: TypedWriteHook<K, T>>
VarTypedShard<'_, K, T, C, H>
{
fn inner_mut(&mut self) -> &mut VarShard<'_, K, VarTypedHookAdapter<K, T, C, H>> {
unsafe { &mut *(self.inner as *mut VarShard<'_, K, VarTypedHookAdapter<K, T, C, H>>) }
}
fn inner_ref(&self) -> &VarShard<'_, K, VarTypedHookAdapter<K, T, C, H>> {
unsafe { &*(self.inner as *const VarShard<'_, K, VarTypedHookAdapter<K, T, C, H>>) }
}
pub fn put(&mut self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.tree.codec.encode_to(value, &mut buf);
self.inner_mut().put(key, &buf)
}
pub fn insert(&mut self, key: &K, value: &T) -> DbResult<()> {
let mut buf = Vec::new();
self.tree.codec.encode_to(value, &mut buf);
self.inner_mut().insert(key, &buf)
}
pub fn delete(&mut self, key: &K) -> DbResult<bool> {
self.inner_mut().delete(key)
}
pub fn get(&self, key: &K) -> Option<T> {
let bytes = self.inner_ref().get(key)?;
self.tree.codec.decode_from(&bytes).ok()
}
pub fn get_or_err(&self, key: &K) -> DbResult<T> {
self.get(key).ok_or(DbError::KeyNotFound)
}
pub fn contains(&self, key: &K) -> bool {
self.inner_ref().contains(key)
}
}
#[cfg(feature = "armour")]
impl<T, C, H> crate::armour::collection::Collection for VarTypedTree<T::SelfId, T, C, H>
where
T: crate::CollectionMeta + Send + Sync,
C: Codec<T> + Clone + 'static,
H: TypedWriteHook<T::SelfId, T>,
T::SelfId: crate::Key + Ord,
{
fn name(&self) -> &str {
T::NAME
}
fn len(&self) -> usize {
self.len()
}
fn compact(&self) -> DbResult<usize> {
self.compact()
}
}